You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by "kfaraz (via GitHub)" <gi...@apache.org> on 2023/05/16 16:00:40 UTC

[GitHub] [druid] kfaraz opened a new pull request, #14293: Remove `giant` lock from Overlord TaskQueue

kfaraz opened a new pull request, #14293:
URL: https://github.com/apache/druid/pull/14293

   ### Description
   
   The `TaskQueue` in the Overlord implements concurrency control using a `giant` lock. A similar technique has been used in other classes such as `TaskMaster` and `TaskLockbox`. While this `giant` lock does guarantee thread-safe access of critical sections of code, it can be too restrictive at times and can even lead to the Overlord being completely stuck.
   
   A typical scenario is described below.
   - Insertion of a sub-task of an `index_parallel` task fails with a `SQLTransientException` (say, due to an oversized payload)
   - The `index_parallel` repeatedly requests the Overlord to insert this sub-task
   - Each time, the Overlord tries to insert this task up to 10 times (using `RetryUtils`)
   - While the Overlord is trying to insert, the calling thread holds the `TaskQueue.giant` lock
   - This causes the Overlord to essentially hang as no other `TaskQueue` operation can proceed without the lock. This includes operations like adding a new task, killing a task, submitting tasks to runner for execution, syncing from metadata, etc.
   
   Note: The indefinite retry issue in the above scenario is also being addressed separately in #14271 
   
   ### Current implementation
   
   The `giant` lock is a reentrant lock, which is effectively the same as the object monitor associated with any java object. In principle, this lock could be replaced by simply making all the methods of `TaskQueue` `synchronized`.
   
   The following operations/fields are protected by this lock.
   - Methods: `start()`, `stop()`, `syncFromStorage()`, `manage()`
   - Field `tasks`: `putIfAbsent()`, `get()`, `values()`, `remove()`
   - Field `taskFutures`: `put()`, `remove()`
   - Field `taskStorage`: `insert()`
   - Field `taskLockbox`: `add()`, `remove()`
   
   ### Proposed implementation
   
   | Method/Field | Change | Rationale |
   |--------------|---------|-----------|
   | Methods:<br>`start()`<br>`stop()`<br>`syncFromStorage()`<br>`manage()` | Make methods `synchronized` | This effectively remains the same as the current implementation |
   | Field: `tasks`<br>Type: `LinkedHashMap<String, Task>`<br>Methods: `putIfAbsent()`, `get()`, `remove()` | Use a `ConcurrentHashMap` instead in conjunction with a `BlockingDeque<String>` to maintain order | The only concurrency control needed here is at a task level, which can be easily ensured by a `ConcurrentHashMap`. The `BlockingDeque` is used to maintain the order in which task IDs were submitted to the `TaskQueue`. The updates to these data structures are made atomically using `compute()` and `computeIfAbsent()`. |
   | Field: `taskFutures`<br>Type: `HashMap<String, Future>` | Replace with `ConcurrentHashSet<String> submittedTaskIds` | This is just needed to maintain a set of task ids that have already been submitted to the `TaskRunner` |
   | Field: `TaskStorage`<br>Methods: `insert()` | Move outside critical section | `TaskStorage` implementations do not maintain any state and thus don't require any concurrency control |
   | Field: `TaskLockbox`<br>Methods: `add()`, `remove()` | Move outside critical section | `TaskLockbox` has its own `giant` lock and can thus be safely accessed here. |
   
   ### Other changes
   - 
   
   <hr>
   
   This PR has:
   
   - [ ] been self-reviewed.
      - [ ] using the [concurrency checklist](https://github.com/apache/druid/blob/master/dev/code-review/concurrency.md) (Remove this item if the PR doesn't have any relation to concurrency.)
   - [ ] added documentation for new or modified features or behaviors.
   - [ ] a release note entry in the PR description.
   - [x] added Javadocs for most classes and all non-trivial methods. Linked related entities via Javadoc links.
   - [ ] added or updated version, license, or notice information in [licenses.yaml](https://github.com/apache/druid/blob/master/dev/license.md)
   - [x] added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader.
   - [x] added unit tests or modified existing tests to cover new code paths, ensuring the threshold for [code coverage](https://github.com/apache/druid/blob/master/dev/code-review/code-coverage.md) is met.
   - [ ] added integration tests.
   - [ ] been tested in a test Druid cluster.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] kfaraz commented on a diff in pull request #14293: Remove `giant` lock from Overlord TaskQueue

Posted by "kfaraz (via GitHub)" <gi...@apache.org>.
kfaraz commented on code in PR #14293:
URL: https://github.com/apache/druid/pull/14293#discussion_r1195851876


##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java:
##########
@@ -156,314 +153,248 @@ public TaskQueue(
     this.emitter = Preconditions.checkNotNull(emitter, "emitter");
   }
 
-  @VisibleForTesting
-  void setActive(boolean active)
-  {
-    this.active = active;
-  }
-
   /**
    * Starts this task queue. Allows {@link #add(Task)} to accept new tasks.
    */
   @LifecycleStart
-  public void start()
+  public synchronized void start()
   {
-    giant.lock();
-
-    try {
-      Preconditions.checkState(!active, "queue must be stopped");
-      active = true;
-      syncFromStorage();
-      // Mark these tasks as failed as they could not reacuire the lock
-      // Clean up needs to happen after tasks have been synced from storage
-      Set<Task> tasksToFail = taskLockbox.syncFromStorage().getTasksToFail();
-      for (Task task : tasksToFail) {
-        shutdown(task.getId(),
-                 "Shutting down forcefully as task failed to reacquire lock while becoming leader");
+    Preconditions.checkState(!active, "queue must be stopped");
+
+    // Mark queue as active only after first sync is complete
+    syncFromStorage();
+    active = true;
+
+    // Mark these tasks as failed as they could not reacquire locks
+    Set<Task> tasksToFail = taskLockbox.syncFromStorage().getTasksToFail();
+    for (Task task : tasksToFail) {
+      shutdown(
+          task.getId(),
+          "Shutting down forcefully as task failed to reacquire lock while becoming leader"
+      );
+    }
+    requestManagement();
+    // Remove any unacquired locks from storage (shutdown only clears entries for which a TaskLockPosse was acquired)
+    // This is called after requesting management as locks need to be cleared after notifyStatus is processed

Review Comment:
   Yeah, my thoughts exactly. Calling `requestManagement()` does not guarantee that management has actually happened. I am not a 100% on what was going on here, so left it as is. I will double check this code and maybe add some tests.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] kfaraz commented on a diff in pull request #14293: Remove `giant` lock from Overlord TaskQueue

Posted by "kfaraz (via GitHub)" <gi...@apache.org>.
kfaraz commented on code in PR #14293:
URL: https://github.com/apache/druid/pull/14293#discussion_r1200033721


##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java:
##########
@@ -156,314 +153,248 @@ public TaskQueue(
     this.emitter = Preconditions.checkNotNull(emitter, "emitter");
   }
 
-  @VisibleForTesting
-  void setActive(boolean active)
-  {
-    this.active = active;
-  }
-
   /**
    * Starts this task queue. Allows {@link #add(Task)} to accept new tasks.
    */
   @LifecycleStart
-  public void start()
+  public synchronized void start()
   {
-    giant.lock();
-
-    try {
-      Preconditions.checkState(!active, "queue must be stopped");
-      active = true;
-      syncFromStorage();
-      // Mark these tasks as failed as they could not reacuire the lock
-      // Clean up needs to happen after tasks have been synced from storage
-      Set<Task> tasksToFail = taskLockbox.syncFromStorage().getTasksToFail();
-      for (Task task : tasksToFail) {
-        shutdown(task.getId(),
-                 "Shutting down forcefully as task failed to reacquire lock while becoming leader");
+    Preconditions.checkState(!active, "queue must be stopped");
+
+    // Mark queue as active only after first sync is complete
+    syncFromStorage();
+    active = true;
+
+    // Mark these tasks as failed as they could not reacquire locks
+    Set<Task> tasksToFail = taskLockbox.syncFromStorage().getTasksToFail();
+    for (Task task : tasksToFail) {
+      shutdown(
+          task.getId(),
+          "Shutting down forcefully as task failed to reacquire lock while becoming leader"
+      );
+    }
+    requestManagement();
+    // Remove any unacquired locks from storage (shutdown only clears entries for which a TaskLockPosse was acquired)
+    // This is called after requesting management as locks need to be cleared after notifyStatus is processed

Review Comment:
   I checked the code. This comment is outdated/irrelevant now.
   
   Firstly, calling `requestManagement()` just adds a request to the queue and does not ensure that task management has actually taken place. In fact, on start-up, it typically wouldn't take place until after 1 minute (default value of start delay).
   
   Secondly, even if for some other reason, `requestManagement()` needs to be called before clearing the dangling locks, it would have already been called in the for loop preceding this (`shutdown` calls `notifyStatus` calls `requestManagement`).
   
   I am removing this comment and moving the contents of the next for loop into the first one. I am also adding some tests to ensure cleanup of such dangling locks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] kfaraz commented on a diff in pull request #14293: Remove `giant` lock from Overlord TaskQueue

Posted by "kfaraz (via GitHub)" <gi...@apache.org>.
kfaraz commented on code in PR #14293:
URL: https://github.com/apache/druid/pull/14293#discussion_r1195899885


##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java:
##########
@@ -156,314 +153,248 @@ public TaskQueue(
     this.emitter = Preconditions.checkNotNull(emitter, "emitter");
   }
 
-  @VisibleForTesting
-  void setActive(boolean active)
-  {
-    this.active = active;
-  }
-
   /**
    * Starts this task queue. Allows {@link #add(Task)} to accept new tasks.
    */
   @LifecycleStart
-  public void start()
+  public synchronized void start()
   {
-    giant.lock();
-
-    try {
-      Preconditions.checkState(!active, "queue must be stopped");
-      active = true;
-      syncFromStorage();
-      // Mark these tasks as failed as they could not reacuire the lock
-      // Clean up needs to happen after tasks have been synced from storage
-      Set<Task> tasksToFail = taskLockbox.syncFromStorage().getTasksToFail();
-      for (Task task : tasksToFail) {
-        shutdown(task.getId(),
-                 "Shutting down forcefully as task failed to reacquire lock while becoming leader");
+    Preconditions.checkState(!active, "queue must be stopped");
+
+    // Mark queue as active only after first sync is complete
+    syncFromStorage();
+    active = true;
+
+    // Mark these tasks as failed as they could not reacquire locks
+    Set<Task> tasksToFail = taskLockbox.syncFromStorage().getTasksToFail();
+    for (Task task : tasksToFail) {
+      shutdown(
+          task.getId(),
+          "Shutting down forcefully as task failed to reacquire lock while becoming leader"
+      );
+    }
+    requestManagement();
+    // Remove any unacquired locks from storage (shutdown only clears entries for which a TaskLockPosse was acquired)
+    // This is called after requesting management as locks need to be cleared after notifyStatus is processed
+    for (Task task : tasksToFail) {
+      for (TaskLock lock : taskStorage.getLocks(task.getId())) {
+        taskStorage.removeLock(task.getId(), lock);
       }
-      managerExec.submit(
-          new Runnable()
-          {
-            @Override
-            public void run()
-            {
-              while (true) {
-                try {
-                  manage();
-                  break;
-                }
-                catch (InterruptedException e) {
-                  log.info("Interrupted, exiting!");
-                  break;
-                }
-                catch (Exception e) {
-                  final long restartDelay = config.getRestartDelay().getMillis();
-                  log.makeAlert(e, "Failed to manage").addData("restartDelay", restartDelay).emit();
-                  try {
-                    Thread.sleep(restartDelay);
-                  }
-                  catch (InterruptedException e2) {
-                    log.info("Interrupted, exiting!");
-                    break;
-                  }
-                }
-              }
+    }
+    log.info("Cleaned up [%d] tasks which failed to reacquire locks.", tasksToFail.size());
+
+    // Submit task management job
+    managerExec.submit(
+        () -> {
+          log.info("Beginning task management in [%s].", config.getStartDelay());
+          long startDelayMillis = config.getStartDelay().getMillis();
+          while (active) {
+            try {
+              Thread.sleep(startDelayMillis);
+              runTaskManagement();
+            }
+            catch (InterruptedException e) {
+              log.info("Interrupted, stopping task management.");
+              break;
+            }
+            catch (Exception e) {
+              startDelayMillis = config.getRestartDelay().getMillis();
+              log.makeAlert(e, "Failed to manage").addData("restartDelay", startDelayMillis).emit();
             }
           }
-      );
-      ScheduledExecutors.scheduleAtFixedRate(
-          storageSyncExec,
-          config.getStorageSyncRate(),
-          new Callable<ScheduledExecutors.Signal>()
-          {
-            @Override
-            public ScheduledExecutors.Signal call()
-            {
-              try {
-                syncFromStorage();
-              }
-              catch (Exception e) {
-                if (active) {
-                  log.makeAlert(e, "Failed to sync with storage").emit();
-                }
-              }
-              if (active) {
-                return ScheduledExecutors.Signal.REPEAT;
-              } else {
-                return ScheduledExecutors.Signal.STOP;
-              }
+        }
+    );
+
+    // Schedule storage sync job
+    ScheduledExecutors.scheduleAtFixedRate(
+        storageSyncExec,
+        config.getStorageSyncRate(),
+        () -> {
+          if (!active) {
+            log.info("Stopping storage sync as TaskQueue has been stopped");
+            return ScheduledExecutors.Signal.STOP;
+          }
+
+          try {
+            syncFromStorage();
+          }
+          catch (Exception e) {
+            if (active) {
+              log.makeAlert(e, "Failed to sync with storage").emit();
             }
           }
-      );
-      requestManagement();
-      // Remove any unacquired locks from storage (shutdown only clears entries for which a TaskLockPosse was acquired)
-      // This is called after requesting management as locks need to be cleared after notifyStatus is processed
-      for (Task task : tasksToFail) {
-        for (TaskLock lock : taskStorage.getLocks(task.getId())) {
-          taskStorage.removeLock(task.getId(), lock);
+
+          return active ? ScheduledExecutors.Signal.REPEAT : ScheduledExecutors.Signal.STOP;
         }
-      }
-    }
-    finally {
-      giant.unlock();
-    }
+    );
   }
 
   /**
    * Shuts down the queue.
    */
   @LifecycleStop
-  public void stop()
+  public synchronized void stop()
   {
-    giant.lock();
-
-    try {
-      tasks.clear();
-      taskFutures.clear();
-      active = false;
-      managerExec.shutdownNow();
-      storageSyncExec.shutdownNow();
-      requestManagement();
-    }
-    finally {
-      giant.unlock();
-    }
-  }
-
-  public boolean isActive()
-  {
-    return active;
+    active = false;
+    tasks.clear();
+    submittedTaskIds.clear();
+    recentlyCompletedTaskIds.clear();
+    managerExec.shutdownNow();
+    storageSyncExec.shutdownNow();
+    requestManagement();
   }
 
   /**
    * Request management from the management thread. Non-blocking.

Review Comment:
   Reverted to using `BlockingQueue` as it is really in the same spirit as this PR of minimizing locking.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] github-code-scanning[bot] commented on a diff in pull request #14293: Remove `giant` lock from Overlord TaskQueue

Posted by "github-code-scanning[bot] (via GitHub)" <gi...@apache.org>.
github-code-scanning[bot] commented on code in PR #14293:
URL: https://github.com/apache/druid/pull/14293#discussion_r1196731948


##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java:
##########
@@ -156,314 +158,245 @@
     this.emitter = Preconditions.checkNotNull(emitter, "emitter");
   }
 
-  @VisibleForTesting
-  void setActive(boolean active)
-  {
-    this.active = active;
-  }
-
   /**
    * Starts this task queue. Allows {@link #add(Task)} to accept new tasks.
    */
   @LifecycleStart
-  public void start()
+  public synchronized void start()
   {
-    giant.lock();
-
-    try {
-      Preconditions.checkState(!active, "queue must be stopped");
-      active = true;
-      syncFromStorage();
-      // Mark these tasks as failed as they could not reacuire the lock
-      // Clean up needs to happen after tasks have been synced from storage
-      Set<Task> tasksToFail = taskLockbox.syncFromStorage().getTasksToFail();
-      for (Task task : tasksToFail) {
-        shutdown(task.getId(),
-                 "Shutting down forcefully as task failed to reacquire lock while becoming leader");
+    Preconditions.checkState(!active, "queue must be stopped");
+
+    // Mark queue as active only after first sync is complete
+    syncFromStorage();
+    active = true;
+
+    // Mark these tasks as failed as they could not reacquire locks
+    Set<Task> tasksToFail = taskLockbox.syncFromStorage().getTasksToFail();
+    for (Task task : tasksToFail) {
+      shutdown(
+          task.getId(),
+          "Shutting down forcefully as task failed to reacquire lock while becoming leader"
+      );
+    }
+    requestManagement();
+    // Remove any unacquired locks from storage (shutdown only clears entries for which a TaskLockPosse was acquired)
+    // This is called after requesting management as locks need to be cleared after notifyStatus is processed
+    for (Task task : tasksToFail) {
+      for (TaskLock lock : taskStorage.getLocks(task.getId())) {
+        taskStorage.removeLock(task.getId(), lock);
       }
-      managerExec.submit(
-          new Runnable()
-          {
-            @Override
-            public void run()
-            {
-              while (true) {
-                try {
-                  manage();
-                  break;
-                }
-                catch (InterruptedException e) {
-                  log.info("Interrupted, exiting!");
-                  break;
-                }
-                catch (Exception e) {
-                  final long restartDelay = config.getRestartDelay().getMillis();
-                  log.makeAlert(e, "Failed to manage").addData("restartDelay", restartDelay).emit();
-                  try {
-                    Thread.sleep(restartDelay);
-                  }
-                  catch (InterruptedException e2) {
-                    log.info("Interrupted, exiting!");
-                    break;
-                  }
-                }
-              }
+    }
+    log.info("Cleaned up [%d] tasks which failed to reacquire locks.", tasksToFail.size());
+
+    // Submit task management job
+    managerExec.submit(
+        () -> {
+          log.info("Beginning task management in [%s].", config.getStartDelay());
+          long startDelayMillis = config.getStartDelay().getMillis();
+          while (active) {
+            try {
+              Thread.sleep(startDelayMillis);
+              runTaskManagement();
+            }
+            catch (InterruptedException e) {
+              log.info("Interrupted, stopping task management.");
+              break;
+            }
+            catch (Exception e) {
+              startDelayMillis = config.getRestartDelay().getMillis();
+              log.makeAlert(e, "Failed to manage").addData("restartDelay", startDelayMillis).emit();
             }
           }
-      );
-      ScheduledExecutors.scheduleAtFixedRate(
-          storageSyncExec,
-          config.getStorageSyncRate(),
-          new Callable<ScheduledExecutors.Signal>()
-          {
-            @Override
-            public ScheduledExecutors.Signal call()
-            {
-              try {
-                syncFromStorage();
-              }
-              catch (Exception e) {
-                if (active) {
-                  log.makeAlert(e, "Failed to sync with storage").emit();
-                }
-              }
-              if (active) {
-                return ScheduledExecutors.Signal.REPEAT;
-              } else {
-                return ScheduledExecutors.Signal.STOP;
-              }
+        }
+    );
+
+    // Schedule storage sync job
+    ScheduledExecutors.scheduleAtFixedRate(
+        storageSyncExec,
+        config.getStorageSyncRate(),
+        () -> {
+          if (!active) {
+            log.info("Stopping storage sync as TaskQueue has been stopped");
+            return ScheduledExecutors.Signal.STOP;
+          }
+
+          try {
+            syncFromStorage();
+          }
+          catch (Exception e) {
+            if (active) {
+              log.makeAlert(e, "Failed to sync with storage").emit();
             }
           }
-      );
-      requestManagement();
-      // Remove any unacquired locks from storage (shutdown only clears entries for which a TaskLockPosse was acquired)
-      // This is called after requesting management as locks need to be cleared after notifyStatus is processed
-      for (Task task : tasksToFail) {
-        for (TaskLock lock : taskStorage.getLocks(task.getId())) {
-          taskStorage.removeLock(task.getId(), lock);
+
+          return active ? ScheduledExecutors.Signal.REPEAT : ScheduledExecutors.Signal.STOP;
         }
-      }
-    }
-    finally {
-      giant.unlock();
-    }
+    );
   }
 
   /**
    * Shuts down the queue.
    */
   @LifecycleStop
-  public void stop()
-  {
-    giant.lock();
-
-    try {
-      tasks.clear();
-      taskFutures.clear();
-      active = false;
-      managerExec.shutdownNow();
-      storageSyncExec.shutdownNow();
-      requestManagement();
-    }
-    finally {
-      giant.unlock();
-    }
-  }
-
-  public boolean isActive()
+  public synchronized void stop()
   {
-    return active;
+    active = false;
+    tasks.clear();
+    submittedTaskIds.clear();
+    recentlyCompletedTaskIds.clear();
+    managerExec.shutdownNow();
+    storageSyncExec.shutdownNow();
+    requestManagement();
   }
 
   /**
    * Request management from the management thread. Non-blocking.
-   *
-   * Other callers (such as notifyStatus) should trigger activity on the
-   * TaskQueue thread by requesting management here.
+   * <p>
+   * Callers (such as notifyStatus) can trigger task management by calling
+   * this method.
    */
-  void requestManagement()
+  private void requestManagement()
   {
-    // use a BlockingQueue since the offer/poll/wait behaviour is simple
-    // and very easy to reason about
-
-    // the request has to be offer (non blocking), since someone might request
-    // while already holding giant lock
-
     // do not care if the item fits into the queue:
     // if the queue is already full, request has been triggered anyway
-    managementMayBeNecessary.offer(this);
+    managementRequestQueue.offer(this);

Review Comment:
   ## Ignored error status of call
   
   Method requestManagement ignores exceptional return value of BlockingQueue<Object>.offer.
   
   [Show more details](https://github.com/apache/druid/security/code-scanning/4230)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] kfaraz commented on a diff in pull request #14293: Remove `giant` lock from Overlord TaskQueue

Posted by "kfaraz (via GitHub)" <gi...@apache.org>.
kfaraz commented on code in PR #14293:
URL: https://github.com/apache/druid/pull/14293#discussion_r1195882867


##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java:
##########
@@ -156,314 +153,248 @@ public TaskQueue(
     this.emitter = Preconditions.checkNotNull(emitter, "emitter");
   }
 
-  @VisibleForTesting
-  void setActive(boolean active)
-  {
-    this.active = active;
-  }
-
   /**
    * Starts this task queue. Allows {@link #add(Task)} to accept new tasks.
    */
   @LifecycleStart
-  public void start()
+  public synchronized void start()
   {
-    giant.lock();
-
-    try {
-      Preconditions.checkState(!active, "queue must be stopped");
-      active = true;
-      syncFromStorage();
-      // Mark these tasks as failed as they could not reacuire the lock
-      // Clean up needs to happen after tasks have been synced from storage
-      Set<Task> tasksToFail = taskLockbox.syncFromStorage().getTasksToFail();
-      for (Task task : tasksToFail) {
-        shutdown(task.getId(),
-                 "Shutting down forcefully as task failed to reacquire lock while becoming leader");
+    Preconditions.checkState(!active, "queue must be stopped");
+
+    // Mark queue as active only after first sync is complete
+    syncFromStorage();
+    active = true;
+
+    // Mark these tasks as failed as they could not reacquire locks
+    Set<Task> tasksToFail = taskLockbox.syncFromStorage().getTasksToFail();
+    for (Task task : tasksToFail) {
+      shutdown(
+          task.getId(),
+          "Shutting down forcefully as task failed to reacquire lock while becoming leader"
+      );
+    }
+    requestManagement();
+    // Remove any unacquired locks from storage (shutdown only clears entries for which a TaskLockPosse was acquired)
+    // This is called after requesting management as locks need to be cleared after notifyStatus is processed
+    for (Task task : tasksToFail) {
+      for (TaskLock lock : taskStorage.getLocks(task.getId())) {
+        taskStorage.removeLock(task.getId(), lock);
       }
-      managerExec.submit(
-          new Runnable()
-          {
-            @Override
-            public void run()
-            {
-              while (true) {
-                try {
-                  manage();
-                  break;
-                }
-                catch (InterruptedException e) {
-                  log.info("Interrupted, exiting!");
-                  break;
-                }
-                catch (Exception e) {
-                  final long restartDelay = config.getRestartDelay().getMillis();
-                  log.makeAlert(e, "Failed to manage").addData("restartDelay", restartDelay).emit();
-                  try {
-                    Thread.sleep(restartDelay);
-                  }
-                  catch (InterruptedException e2) {
-                    log.info("Interrupted, exiting!");
-                    break;
-                  }
-                }
-              }
+    }
+    log.info("Cleaned up [%d] tasks which failed to reacquire locks.", tasksToFail.size());
+
+    // Submit task management job
+    managerExec.submit(
+        () -> {
+          log.info("Beginning task management in [%s].", config.getStartDelay());
+          long startDelayMillis = config.getStartDelay().getMillis();
+          while (active) {
+            try {
+              Thread.sleep(startDelayMillis);
+              runTaskManagement();
+            }
+            catch (InterruptedException e) {
+              log.info("Interrupted, stopping task management.");
+              break;
+            }
+            catch (Exception e) {
+              startDelayMillis = config.getRestartDelay().getMillis();
+              log.makeAlert(e, "Failed to manage").addData("restartDelay", startDelayMillis).emit();
             }
           }
-      );
-      ScheduledExecutors.scheduleAtFixedRate(
-          storageSyncExec,
-          config.getStorageSyncRate(),
-          new Callable<ScheduledExecutors.Signal>()
-          {
-            @Override
-            public ScheduledExecutors.Signal call()
-            {
-              try {
-                syncFromStorage();
-              }
-              catch (Exception e) {
-                if (active) {
-                  log.makeAlert(e, "Failed to sync with storage").emit();
-                }
-              }
-              if (active) {
-                return ScheduledExecutors.Signal.REPEAT;
-              } else {
-                return ScheduledExecutors.Signal.STOP;
-              }
+        }
+    );
+
+    // Schedule storage sync job
+    ScheduledExecutors.scheduleAtFixedRate(
+        storageSyncExec,
+        config.getStorageSyncRate(),
+        () -> {
+          if (!active) {
+            log.info("Stopping storage sync as TaskQueue has been stopped");
+            return ScheduledExecutors.Signal.STOP;
+          }
+
+          try {
+            syncFromStorage();
+          }
+          catch (Exception e) {
+            if (active) {
+              log.makeAlert(e, "Failed to sync with storage").emit();
             }
           }
-      );
-      requestManagement();
-      // Remove any unacquired locks from storage (shutdown only clears entries for which a TaskLockPosse was acquired)
-      // This is called after requesting management as locks need to be cleared after notifyStatus is processed
-      for (Task task : tasksToFail) {
-        for (TaskLock lock : taskStorage.getLocks(task.getId())) {
-          taskStorage.removeLock(task.getId(), lock);
+
+          return active ? ScheduledExecutors.Signal.REPEAT : ScheduledExecutors.Signal.STOP;
         }
-      }
-    }
-    finally {
-      giant.unlock();
-    }
+    );
   }
 
   /**
    * Shuts down the queue.
    */
   @LifecycleStop
-  public void stop()
+  public synchronized void stop()
   {
-    giant.lock();
-
-    try {
-      tasks.clear();
-      taskFutures.clear();
-      active = false;
-      managerExec.shutdownNow();
-      storageSyncExec.shutdownNow();
-      requestManagement();
-    }
-    finally {
-      giant.unlock();
-    }
-  }
-
-  public boolean isActive()
-  {
-    return active;
+    active = false;
+    tasks.clear();
+    submittedTaskIds.clear();
+    recentlyCompletedTaskIds.clear();
+    managerExec.shutdownNow();
+    storageSyncExec.shutdownNow();
+    requestManagement();
   }
 
   /**
    * Request management from the management thread. Non-blocking.
-   *
-   * Other callers (such as notifyStatus) should trigger activity on the
-   * TaskQueue thread by requesting management here.
+   * <p>
+   * Callers (such as notifyStatus) can trigger task management by calling
+   * this method.
    */
-  void requestManagement()
+  private void requestManagement()
   {
-    // use a BlockingQueue since the offer/poll/wait behaviour is simple
-    // and very easy to reason about
-
-    // the request has to be offer (non blocking), since someone might request
-    // while already holding giant lock
-
-    // do not care if the item fits into the queue:
-    // if the queue is already full, request has been triggered anyway
-    managementMayBeNecessary.offer(this);
+    synchronized (managementRequested) {
+      managementRequested.set(true);
+      managementRequested.notify();
+    }
   }
 
   /**
-   * Await for an event to manage.
-   *
-   * This should only be called from the management thread to wait for activity.
+   * Waits for a management request to be triggered by another thread.
    *
-   * @param nanos
-   * @throws InterruptedException
+   * @throws InterruptedException if the thread is interrupted while waiting.
    */
-  @SuppressFBWarnings(value = "RV_RETURN_VALUE_IGNORED", justification = "using queue as notification mechanism, result has no value")
-  void awaitManagementNanos(long nanos) throws InterruptedException
+  private void awaitManagementRequest() throws InterruptedException
   {
     // mitigate a busy loop, it can get pretty busy when there are a lot of start/stops
     try {
-      Thread.sleep(MIN_WAIT_TIME_MS);
+      Thread.sleep(MIN_WAIT_TIME_MILLIS);
     }
     catch (InterruptedException e) {
       throw new RuntimeException(e);
     }
 
-    // wait for an item, if an item arrives (or is already available), complete immediately
-    // (does not actually matter what the item is)
-    managementMayBeNecessary.poll(nanos - (TimeUnit.MILLISECONDS.toNanos(MIN_WAIT_TIME_MS)), TimeUnit.NANOSECONDS);
-
-    // there may have been multiple requests, clear them all
-    managementMayBeNecessary.clear();
+    // Wait for management to be requested
+    synchronized (managementRequested) {
+      while (!managementRequested.get()) {
+        managementRequested.wait(MANAGEMENT_WAIT_TIMEOUT_MILLIS - MIN_WAIT_TIME_MILLIS);
+      }
+      managementRequested.compareAndSet(true, false);
+    }
   }
 
   /**
    * Main task runner management loop. Meant to run forever, or, at least until we're stopped.
    */
-  private void manage() throws InterruptedException
+  private void runTaskManagement() throws InterruptedException
   {
-    log.info("Beginning management in %s.", config.getStartDelay());
-    Thread.sleep(config.getStartDelay().getMillis());
-
     // Ignore return value- we'll get the IDs and futures from getKnownTasks later.
     taskRunner.restore();
 
     while (active) {
-      manageInternal();
-
-      // awaitNanos because management may become necessary without this condition signalling,
-      // due to e.g. tasks becoming ready when other folks mess with the TaskLockbox.
-      awaitManagementNanos(MANAGEMENT_WAIT_TIMEOUT_NANOS);
+      manageTasks();
+      awaitManagementRequest();
     }
   }
 
   @VisibleForTesting
-  void manageInternal()
+  void manageTasks()
   {
-    Set<String> knownTaskIds = new HashSet<>();
-    Map<String, ListenableFuture<TaskStatus>> runnerTaskFutures = new HashMap<>();
-
-    giant.lock();
-
-    try {
-      manageInternalCritical(knownTaskIds, runnerTaskFutures);
-    }
-    finally {
-      giant.unlock();
-    }
-
-    manageInternalPostCritical(knownTaskIds, runnerTaskFutures);
+    runReadyTasks();
+    killUnknownTasks();
   }
 
 
   /**
-   * Management loop critical section tasks.
-   *
-   * @param knownTaskIds will be modified - filled with known task IDs
-   * @param runnerTaskFutures will be modified - filled with futures related to getting the running tasks
+   * Submits ready tasks to the TaskRunner.
+   * <p>
+   * This method should be called only by the management thread.
    */
-  @GuardedBy("giant")
-  private void manageInternalCritical(
-      final Set<String> knownTaskIds,
-      final Map<String, ListenableFuture<TaskStatus>> runnerTaskFutures
-  )
+  private synchronized void runReadyTasks()
   {
     // Task futures available from the taskRunner
+    final Map<String, ListenableFuture<TaskStatus>> runnerTaskFutures = new HashMap<>();
     for (final TaskRunnerWorkItem workItem : taskRunner.getKnownTasks()) {
-      if (!recentlyCompletedTasks.contains(workItem.getTaskId())) {
-        // Don't do anything with tasks that have recently finished; notifyStatus will handle it.
+      if (!recentlyCompletedTaskIds.contains(workItem.getTaskId())) {
+        // Don't do anything with recently completed tasks; notifyStatus will handle it.
         runnerTaskFutures.put(workItem.getTaskId(), workItem.getResult());
       }
     }
+
     // Attain futures for all active tasks (assuming they are ready to run).
-    // Copy tasks list, as notifyStatus may modify it.
-    for (final Task task : ImmutableList.copyOf(tasks.values())) {
-      if (recentlyCompletedTasks.contains(task.getId())) {
-        // Don't do anything with tasks that have recently finished; notifyStatus will handle it.
-        continue;
+    for (final String taskId : Lists.newArrayList(activeTaskIdQueue)) {
+      final Task task = tasks.get(taskId);
+      if (task == null || recentlyCompletedTaskIds.contains(taskId)) {
+        // Don't do anything for unknown tasks or recently completed tasks
+      } else if (submittedTaskIds.contains(taskId)) {
+        // Re-trigger execution of pending task to avoid unnecessary delays
+        // see https://github.com/apache/druid/pull/6991
+        if (isTaskPending(task)) {
+          taskRunner.run(task);
+        }
+      } else if (runnerTaskFutures.containsKey(taskId)) {
+        attachCallbacks(task, runnerTaskFutures.get(taskId));
+        submittedTaskIds.add(taskId);
+      } else if (isTaskReady(task)) {
+        log.info("Asking taskRunner to run ready task [%s].", taskId);
+        attachCallbacks(task, taskRunner.run(task));
+        submittedTaskIds.add(taskId);
+      } else {
+        // Release all locks (possibly acquired by task.isReady()) if task is not ready
+        taskLockbox.unlockAll(task);
       }
+    }
+  }
 
-      knownTaskIds.add(task.getId());
-
-      if (!taskFutures.containsKey(task.getId())) {
-        final ListenableFuture<TaskStatus> runnerTaskFuture;
-        if (runnerTaskFutures.containsKey(task.getId())) {
-          runnerTaskFuture = runnerTaskFutures.get(task.getId());
-        } else {
-          // Task should be running, so run it.
-          final boolean taskIsReady;
-          try {
-            taskIsReady = task.isReady(taskActionClientFactory.create(task));
-          }
-          catch (Exception e) {
-            log.warn(e, "Exception thrown during isReady for task: %s", task.getId());
-            final String errorMessage;
-            if (e instanceof MaxAllowedLocksExceededException) {
-              errorMessage = e.getMessage();
-            } else {
-              errorMessage = "Failed while waiting for the task to be ready to run. "
-                                          + "See overlord logs for more details.";
-            }
-            notifyStatus(task, TaskStatus.failure(task.getId(), errorMessage), errorMessage);
-            continue;
-          }
-          if (taskIsReady) {
-            log.info("Asking taskRunner to run: %s", task.getId());
-            runnerTaskFuture = taskRunner.run(task);
-          } else {
-            // Task.isReady() can internally lock intervals or segments.
-            // We should release them if the task is not ready.
-            taskLockbox.unlockAll(task);
-            continue;
-          }
-        }
-        taskFutures.put(task.getId(), attachCallbacks(task, runnerTaskFuture));
-      } else if (isTaskPending(task)) {
-        // if the taskFutures contain this task and this task is pending, also let the taskRunner
-        // to run it to guarantee it will be assigned to run
-        // see https://github.com/apache/druid/pull/6991
-        taskRunner.run(task);
+  private boolean isTaskReady(Task task)
+  {
+    try {
+      return task.isReady(taskActionClientFactory.create(task));
+    }
+    catch (Exception e) {
+      log.warn(e, "Error while checking if task [%s] is ready to run.", task.getId());
+      final String errorMessage;
+      if (e instanceof MaxAllowedLocksExceededException) {
+        errorMessage = e.getMessage();
+      } else {
+        errorMessage = "Failed while waiting for the task to be ready to run. "
+                       + "See overlord logs for more details.";
       }
+      notifyStatus(task, TaskStatus.failure(task.getId(), errorMessage), errorMessage);
+      return false;
     }
   }
 
-  @VisibleForTesting
-  private void manageInternalPostCritical(
-      final Set<String> knownTaskIds,
-      final Map<String, ListenableFuture<TaskStatus>> runnerTaskFutures
-  )
+  /**
+   * Kills tasks not present in the set of known tasks.
+   */
+  private void killUnknownTasks()
   {
-    // Kill tasks that shouldn't be running
-    final Set<String> tasksToKill = Sets.difference(runnerTaskFutures.keySet(), knownTaskIds);
-    if (!tasksToKill.isEmpty()) {
-      log.info("Asking taskRunner to clean up %,d tasks.", tasksToKill.size());
-
-      // On large installations running several thousands of tasks,
-      // concatenating the list of known task ids can be compupationally expensive.
-      final boolean logKnownTaskIds = log.isDebugEnabled();
-      final String reason = logKnownTaskIds
-              ? StringUtils.format("Task is not in knownTaskIds[%s]", knownTaskIds)
-              : "Task is not in knownTaskIds";
-
-      for (final String taskId : tasksToKill) {
-        try {
-          taskRunner.shutdown(taskId, reason);
-        }
-        catch (Exception e) {
-          log.warn(e, "TaskRunner failed to clean up task: %s", taskId);
-        }
+    final Set<String> knownTaskIds = tasks.keySet();

Review Comment:
   Thanks for calling this out, @jasonk000 !
   
   The issue described in #12901 is basically a race condition between `notifyStatus` and `manageInternalCritical` (renamed in this patch to `runReadyTasks`), where a task that is being shutdown might get relaunched. In #12901, this was solved by using this order of events:
   - Mark the task as `recentlyCompleted`
   - Update metadata store
   - Finish cleanup of in-memory data structures
   All `recentlyCompleted` tasks were ignored in `manageInternalCritical`.
   
   In the new set of changes, I have retained this complete behaviour. As soon as a task is marked as `recentlyCompleted`, it will not be touched by `runReadyTasks` or `killUnknownTasks`. The in-memory data-structures (including `recentlyCompleted` itself) are finally cleaned up atomically only when the task shutdown is finished.
   
   I will try to add some tests for this scenario. Please let me know if you think of any other race condition, would be nice to have tests for all of those.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] kfaraz commented on a diff in pull request #14293: Remove `giant` lock from Overlord TaskQueue

Posted by "kfaraz (via GitHub)" <gi...@apache.org>.
kfaraz commented on code in PR #14293:
URL: https://github.com/apache/druid/pull/14293#discussion_r1195888479


##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java:
##########
@@ -156,314 +153,248 @@ public TaskQueue(
     this.emitter = Preconditions.checkNotNull(emitter, "emitter");
   }
 
-  @VisibleForTesting
-  void setActive(boolean active)
-  {
-    this.active = active;
-  }
-
   /**
    * Starts this task queue. Allows {@link #add(Task)} to accept new tasks.
    */
   @LifecycleStart
-  public void start()
+  public synchronized void start()
   {
-    giant.lock();
-
-    try {
-      Preconditions.checkState(!active, "queue must be stopped");
-      active = true;
-      syncFromStorage();
-      // Mark these tasks as failed as they could not reacuire the lock
-      // Clean up needs to happen after tasks have been synced from storage
-      Set<Task> tasksToFail = taskLockbox.syncFromStorage().getTasksToFail();
-      for (Task task : tasksToFail) {
-        shutdown(task.getId(),
-                 "Shutting down forcefully as task failed to reacquire lock while becoming leader");
+    Preconditions.checkState(!active, "queue must be stopped");
+
+    // Mark queue as active only after first sync is complete
+    syncFromStorage();
+    active = true;
+
+    // Mark these tasks as failed as they could not reacquire locks
+    Set<Task> tasksToFail = taskLockbox.syncFromStorage().getTasksToFail();
+    for (Task task : tasksToFail) {
+      shutdown(
+          task.getId(),
+          "Shutting down forcefully as task failed to reacquire lock while becoming leader"
+      );
+    }
+    requestManagement();
+    // Remove any unacquired locks from storage (shutdown only clears entries for which a TaskLockPosse was acquired)
+    // This is called after requesting management as locks need to be cleared after notifyStatus is processed
+    for (Task task : tasksToFail) {
+      for (TaskLock lock : taskStorage.getLocks(task.getId())) {
+        taskStorage.removeLock(task.getId(), lock);
       }
-      managerExec.submit(
-          new Runnable()
-          {
-            @Override
-            public void run()
-            {
-              while (true) {
-                try {
-                  manage();
-                  break;
-                }
-                catch (InterruptedException e) {
-                  log.info("Interrupted, exiting!");
-                  break;
-                }
-                catch (Exception e) {
-                  final long restartDelay = config.getRestartDelay().getMillis();
-                  log.makeAlert(e, "Failed to manage").addData("restartDelay", restartDelay).emit();
-                  try {
-                    Thread.sleep(restartDelay);
-                  }
-                  catch (InterruptedException e2) {
-                    log.info("Interrupted, exiting!");
-                    break;
-                  }
-                }
-              }
+    }
+    log.info("Cleaned up [%d] tasks which failed to reacquire locks.", tasksToFail.size());
+
+    // Submit task management job
+    managerExec.submit(
+        () -> {
+          log.info("Beginning task management in [%s].", config.getStartDelay());
+          long startDelayMillis = config.getStartDelay().getMillis();
+          while (active) {
+            try {
+              Thread.sleep(startDelayMillis);
+              runTaskManagement();
+            }
+            catch (InterruptedException e) {
+              log.info("Interrupted, stopping task management.");
+              break;
+            }
+            catch (Exception e) {
+              startDelayMillis = config.getRestartDelay().getMillis();
+              log.makeAlert(e, "Failed to manage").addData("restartDelay", startDelayMillis).emit();
             }
           }
-      );
-      ScheduledExecutors.scheduleAtFixedRate(
-          storageSyncExec,
-          config.getStorageSyncRate(),
-          new Callable<ScheduledExecutors.Signal>()
-          {
-            @Override
-            public ScheduledExecutors.Signal call()
-            {
-              try {
-                syncFromStorage();
-              }
-              catch (Exception e) {
-                if (active) {
-                  log.makeAlert(e, "Failed to sync with storage").emit();
-                }
-              }
-              if (active) {
-                return ScheduledExecutors.Signal.REPEAT;
-              } else {
-                return ScheduledExecutors.Signal.STOP;
-              }
+        }
+    );
+
+    // Schedule storage sync job
+    ScheduledExecutors.scheduleAtFixedRate(
+        storageSyncExec,
+        config.getStorageSyncRate(),
+        () -> {
+          if (!active) {
+            log.info("Stopping storage sync as TaskQueue has been stopped");
+            return ScheduledExecutors.Signal.STOP;
+          }
+
+          try {
+            syncFromStorage();
+          }
+          catch (Exception e) {
+            if (active) {
+              log.makeAlert(e, "Failed to sync with storage").emit();
             }
           }
-      );
-      requestManagement();
-      // Remove any unacquired locks from storage (shutdown only clears entries for which a TaskLockPosse was acquired)
-      // This is called after requesting management as locks need to be cleared after notifyStatus is processed
-      for (Task task : tasksToFail) {
-        for (TaskLock lock : taskStorage.getLocks(task.getId())) {
-          taskStorage.removeLock(task.getId(), lock);
+
+          return active ? ScheduledExecutors.Signal.REPEAT : ScheduledExecutors.Signal.STOP;
         }
-      }
-    }
-    finally {
-      giant.unlock();
-    }
+    );
   }
 
   /**
    * Shuts down the queue.
    */
   @LifecycleStop
-  public void stop()
+  public synchronized void stop()
   {
-    giant.lock();
-
-    try {
-      tasks.clear();
-      taskFutures.clear();
-      active = false;
-      managerExec.shutdownNow();
-      storageSyncExec.shutdownNow();
-      requestManagement();
-    }
-    finally {
-      giant.unlock();
-    }
-  }
-
-  public boolean isActive()
-  {
-    return active;
+    active = false;
+    tasks.clear();
+    submittedTaskIds.clear();
+    recentlyCompletedTaskIds.clear();
+    managerExec.shutdownNow();
+    storageSyncExec.shutdownNow();
+    requestManagement();
   }
 
   /**
    * Request management from the management thread. Non-blocking.
-   *
-   * Other callers (such as notifyStatus) should trigger activity on the
-   * TaskQueue thread by requesting management here.
+   * <p>
+   * Callers (such as notifyStatus) can trigger task management by calling
+   * this method.
    */
-  void requestManagement()
+  private void requestManagement()
   {
-    // use a BlockingQueue since the offer/poll/wait behaviour is simple
-    // and very easy to reason about
-
-    // the request has to be offer (non blocking), since someone might request
-    // while already holding giant lock
-
-    // do not care if the item fits into the queue:
-    // if the queue is already full, request has been triggered anyway
-    managementMayBeNecessary.offer(this);
+    synchronized (managementRequested) {
+      managementRequested.set(true);
+      managementRequested.notify();
+    }
   }
 
   /**
-   * Await for an event to manage.
-   *
-   * This should only be called from the management thread to wait for activity.
+   * Waits for a management request to be triggered by another thread.
    *
-   * @param nanos
-   * @throws InterruptedException
+   * @throws InterruptedException if the thread is interrupted while waiting.
    */
-  @SuppressFBWarnings(value = "RV_RETURN_VALUE_IGNORED", justification = "using queue as notification mechanism, result has no value")
-  void awaitManagementNanos(long nanos) throws InterruptedException
+  private void awaitManagementRequest() throws InterruptedException
   {
     // mitigate a busy loop, it can get pretty busy when there are a lot of start/stops
     try {
-      Thread.sleep(MIN_WAIT_TIME_MS);
+      Thread.sleep(MIN_WAIT_TIME_MILLIS);
     }
     catch (InterruptedException e) {
       throw new RuntimeException(e);
     }
 
-    // wait for an item, if an item arrives (or is already available), complete immediately
-    // (does not actually matter what the item is)
-    managementMayBeNecessary.poll(nanos - (TimeUnit.MILLISECONDS.toNanos(MIN_WAIT_TIME_MS)), TimeUnit.NANOSECONDS);
-
-    // there may have been multiple requests, clear them all
-    managementMayBeNecessary.clear();
+    // Wait for management to be requested
+    synchronized (managementRequested) {
+      while (!managementRequested.get()) {
+        managementRequested.wait(MANAGEMENT_WAIT_TIMEOUT_MILLIS - MIN_WAIT_TIME_MILLIS);
+      }
+      managementRequested.compareAndSet(true, false);
+    }
   }
 
   /**
    * Main task runner management loop. Meant to run forever, or, at least until we're stopped.
    */
-  private void manage() throws InterruptedException
+  private void runTaskManagement() throws InterruptedException
   {
-    log.info("Beginning management in %s.", config.getStartDelay());
-    Thread.sleep(config.getStartDelay().getMillis());
-
     // Ignore return value- we'll get the IDs and futures from getKnownTasks later.
     taskRunner.restore();
 
     while (active) {
-      manageInternal();
-
-      // awaitNanos because management may become necessary without this condition signalling,
-      // due to e.g. tasks becoming ready when other folks mess with the TaskLockbox.
-      awaitManagementNanos(MANAGEMENT_WAIT_TIMEOUT_NANOS);
+      manageTasks();
+      awaitManagementRequest();
     }
   }
 
   @VisibleForTesting
-  void manageInternal()
+  void manageTasks()
   {
-    Set<String> knownTaskIds = new HashSet<>();
-    Map<String, ListenableFuture<TaskStatus>> runnerTaskFutures = new HashMap<>();
-
-    giant.lock();
-
-    try {
-      manageInternalCritical(knownTaskIds, runnerTaskFutures);
-    }
-    finally {
-      giant.unlock();
-    }
-
-    manageInternalPostCritical(knownTaskIds, runnerTaskFutures);
+    runReadyTasks();
+    killUnknownTasks();
   }
 
 
   /**
-   * Management loop critical section tasks.
-   *
-   * @param knownTaskIds will be modified - filled with known task IDs
-   * @param runnerTaskFutures will be modified - filled with futures related to getting the running tasks
+   * Submits ready tasks to the TaskRunner.
+   * <p>
+   * This method should be called only by the management thread.
    */
-  @GuardedBy("giant")
-  private void manageInternalCritical(
-      final Set<String> knownTaskIds,
-      final Map<String, ListenableFuture<TaskStatus>> runnerTaskFutures
-  )
+  private synchronized void runReadyTasks()
   {
     // Task futures available from the taskRunner
+    final Map<String, ListenableFuture<TaskStatus>> runnerTaskFutures = new HashMap<>();
     for (final TaskRunnerWorkItem workItem : taskRunner.getKnownTasks()) {
-      if (!recentlyCompletedTasks.contains(workItem.getTaskId())) {
-        // Don't do anything with tasks that have recently finished; notifyStatus will handle it.
+      if (!recentlyCompletedTaskIds.contains(workItem.getTaskId())) {
+        // Don't do anything with recently completed tasks; notifyStatus will handle it.
         runnerTaskFutures.put(workItem.getTaskId(), workItem.getResult());
       }
     }
+
     // Attain futures for all active tasks (assuming they are ready to run).
-    // Copy tasks list, as notifyStatus may modify it.
-    for (final Task task : ImmutableList.copyOf(tasks.values())) {
-      if (recentlyCompletedTasks.contains(task.getId())) {
-        // Don't do anything with tasks that have recently finished; notifyStatus will handle it.
-        continue;
+    for (final String taskId : Lists.newArrayList(activeTaskIdQueue)) {
+      final Task task = tasks.get(taskId);
+      if (task == null || recentlyCompletedTaskIds.contains(taskId)) {
+        // Don't do anything for unknown tasks or recently completed tasks
+      } else if (submittedTaskIds.contains(taskId)) {
+        // Re-trigger execution of pending task to avoid unnecessary delays
+        // see https://github.com/apache/druid/pull/6991
+        if (isTaskPending(task)) {
+          taskRunner.run(task);
+        }
+      } else if (runnerTaskFutures.containsKey(taskId)) {
+        attachCallbacks(task, runnerTaskFutures.get(taskId));
+        submittedTaskIds.add(taskId);
+      } else if (isTaskReady(task)) {
+        log.info("Asking taskRunner to run ready task [%s].", taskId);
+        attachCallbacks(task, taskRunner.run(task));
+        submittedTaskIds.add(taskId);
+      } else {
+        // Release all locks (possibly acquired by task.isReady()) if task is not ready
+        taskLockbox.unlockAll(task);
       }
+    }
+  }
 
-      knownTaskIds.add(task.getId());
-
-      if (!taskFutures.containsKey(task.getId())) {
-        final ListenableFuture<TaskStatus> runnerTaskFuture;
-        if (runnerTaskFutures.containsKey(task.getId())) {
-          runnerTaskFuture = runnerTaskFutures.get(task.getId());
-        } else {
-          // Task should be running, so run it.
-          final boolean taskIsReady;
-          try {
-            taskIsReady = task.isReady(taskActionClientFactory.create(task));
-          }
-          catch (Exception e) {
-            log.warn(e, "Exception thrown during isReady for task: %s", task.getId());
-            final String errorMessage;
-            if (e instanceof MaxAllowedLocksExceededException) {
-              errorMessage = e.getMessage();
-            } else {
-              errorMessage = "Failed while waiting for the task to be ready to run. "
-                                          + "See overlord logs for more details.";
-            }
-            notifyStatus(task, TaskStatus.failure(task.getId(), errorMessage), errorMessage);
-            continue;
-          }
-          if (taskIsReady) {
-            log.info("Asking taskRunner to run: %s", task.getId());
-            runnerTaskFuture = taskRunner.run(task);
-          } else {
-            // Task.isReady() can internally lock intervals or segments.
-            // We should release them if the task is not ready.
-            taskLockbox.unlockAll(task);
-            continue;
-          }
-        }
-        taskFutures.put(task.getId(), attachCallbacks(task, runnerTaskFuture));
-      } else if (isTaskPending(task)) {
-        // if the taskFutures contain this task and this task is pending, also let the taskRunner
-        // to run it to guarantee it will be assigned to run
-        // see https://github.com/apache/druid/pull/6991
-        taskRunner.run(task);
+  private boolean isTaskReady(Task task)
+  {
+    try {
+      return task.isReady(taskActionClientFactory.create(task));
+    }
+    catch (Exception e) {
+      log.warn(e, "Error while checking if task [%s] is ready to run.", task.getId());
+      final String errorMessage;
+      if (e instanceof MaxAllowedLocksExceededException) {
+        errorMessage = e.getMessage();
+      } else {
+        errorMessage = "Failed while waiting for the task to be ready to run. "
+                       + "See overlord logs for more details.";
       }
+      notifyStatus(task, TaskStatus.failure(task.getId(), errorMessage), errorMessage);
+      return false;
     }
   }
 
-  @VisibleForTesting
-  private void manageInternalPostCritical(
-      final Set<String> knownTaskIds,
-      final Map<String, ListenableFuture<TaskStatus>> runnerTaskFutures
-  )
+  /**
+   * Kills tasks not present in the set of known tasks.
+   */
+  private void killUnknownTasks()
   {
-    // Kill tasks that shouldn't be running
-    final Set<String> tasksToKill = Sets.difference(runnerTaskFutures.keySet(), knownTaskIds);
-    if (!tasksToKill.isEmpty()) {
-      log.info("Asking taskRunner to clean up %,d tasks.", tasksToKill.size());
-
-      // On large installations running several thousands of tasks,
-      // concatenating the list of known task ids can be compupationally expensive.
-      final boolean logKnownTaskIds = log.isDebugEnabled();
-      final String reason = logKnownTaskIds
-              ? StringUtils.format("Task is not in knownTaskIds[%s]", knownTaskIds)
-              : "Task is not in knownTaskIds";
-
-      for (final String taskId : tasksToKill) {
-        try {
-          taskRunner.shutdown(taskId, reason);
-        }
-        catch (Exception e) {
-          log.warn(e, "TaskRunner failed to clean up task: %s", taskId);
-        }
+    final Set<String> knownTaskIds = tasks.keySet();

Review Comment:
   The changes in #12099 were a lot of help in cleaning up the `TaskQueue` in this patch. 
   
   The major difference from #12099 is that I am now using a synchronization on an `AtomicBoolean managementRequested` instead of a queue to track if management needs to be done. This choice was made in favour of readability and clarification of intent.
   
   I think you had called out this possible approach here:
   > I chose a BlockingQueue implementation because it is easy to reason about the submission / poll / offer ordering. Other options would be Semaphore etc.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] github-code-scanning[bot] commented on a diff in pull request #14293: Remove `giant` lock from Overlord TaskQueue

Posted by "github-code-scanning[bot] (via GitHub)" <gi...@apache.org>.
github-code-scanning[bot] commented on code in PR #14293:
URL: https://github.com/apache/druid/pull/14293#discussion_r1195916614


##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java:
##########
@@ -156,314 +155,245 @@
     this.emitter = Preconditions.checkNotNull(emitter, "emitter");
   }
 
-  @VisibleForTesting
-  void setActive(boolean active)
-  {
-    this.active = active;
-  }
-
   /**
    * Starts this task queue. Allows {@link #add(Task)} to accept new tasks.
    */
   @LifecycleStart
-  public void start()
+  public synchronized void start()
   {
-    giant.lock();
-
-    try {
-      Preconditions.checkState(!active, "queue must be stopped");
-      active = true;
-      syncFromStorage();
-      // Mark these tasks as failed as they could not reacuire the lock
-      // Clean up needs to happen after tasks have been synced from storage
-      Set<Task> tasksToFail = taskLockbox.syncFromStorage().getTasksToFail();
-      for (Task task : tasksToFail) {
-        shutdown(task.getId(),
-                 "Shutting down forcefully as task failed to reacquire lock while becoming leader");
+    Preconditions.checkState(!active, "queue must be stopped");
+
+    // Mark queue as active only after first sync is complete
+    syncFromStorage();
+    active = true;
+
+    // Mark these tasks as failed as they could not reacquire locks
+    Set<Task> tasksToFail = taskLockbox.syncFromStorage().getTasksToFail();
+    for (Task task : tasksToFail) {
+      shutdown(
+          task.getId(),
+          "Shutting down forcefully as task failed to reacquire lock while becoming leader"
+      );
+    }
+    requestManagement();
+    // Remove any unacquired locks from storage (shutdown only clears entries for which a TaskLockPosse was acquired)
+    // This is called after requesting management as locks need to be cleared after notifyStatus is processed
+    for (Task task : tasksToFail) {
+      for (TaskLock lock : taskStorage.getLocks(task.getId())) {
+        taskStorage.removeLock(task.getId(), lock);
       }
-      managerExec.submit(
-          new Runnable()
-          {
-            @Override
-            public void run()
-            {
-              while (true) {
-                try {
-                  manage();
-                  break;
-                }
-                catch (InterruptedException e) {
-                  log.info("Interrupted, exiting!");
-                  break;
-                }
-                catch (Exception e) {
-                  final long restartDelay = config.getRestartDelay().getMillis();
-                  log.makeAlert(e, "Failed to manage").addData("restartDelay", restartDelay).emit();
-                  try {
-                    Thread.sleep(restartDelay);
-                  }
-                  catch (InterruptedException e2) {
-                    log.info("Interrupted, exiting!");
-                    break;
-                  }
-                }
-              }
+    }
+    log.info("Cleaned up [%d] tasks which failed to reacquire locks.", tasksToFail.size());
+
+    // Submit task management job
+    managerExec.submit(
+        () -> {
+          log.info("Beginning task management in [%s].", config.getStartDelay());
+          long startDelayMillis = config.getStartDelay().getMillis();
+          while (active) {
+            try {
+              Thread.sleep(startDelayMillis);
+              runTaskManagement();
+            }
+            catch (InterruptedException e) {
+              log.info("Interrupted, stopping task management.");
+              break;
+            }
+            catch (Exception e) {
+              startDelayMillis = config.getRestartDelay().getMillis();
+              log.makeAlert(e, "Failed to manage").addData("restartDelay", startDelayMillis).emit();
             }
           }
-      );
-      ScheduledExecutors.scheduleAtFixedRate(
-          storageSyncExec,
-          config.getStorageSyncRate(),
-          new Callable<ScheduledExecutors.Signal>()
-          {
-            @Override
-            public ScheduledExecutors.Signal call()
-            {
-              try {
-                syncFromStorage();
-              }
-              catch (Exception e) {
-                if (active) {
-                  log.makeAlert(e, "Failed to sync with storage").emit();
-                }
-              }
-              if (active) {
-                return ScheduledExecutors.Signal.REPEAT;
-              } else {
-                return ScheduledExecutors.Signal.STOP;
-              }
+        }
+    );
+
+    // Schedule storage sync job
+    ScheduledExecutors.scheduleAtFixedRate(
+        storageSyncExec,
+        config.getStorageSyncRate(),
+        () -> {
+          if (!active) {
+            log.info("Stopping storage sync as TaskQueue has been stopped");
+            return ScheduledExecutors.Signal.STOP;
+          }
+
+          try {
+            syncFromStorage();
+          }
+          catch (Exception e) {
+            if (active) {
+              log.makeAlert(e, "Failed to sync with storage").emit();
             }
           }
-      );
-      requestManagement();
-      // Remove any unacquired locks from storage (shutdown only clears entries for which a TaskLockPosse was acquired)
-      // This is called after requesting management as locks need to be cleared after notifyStatus is processed
-      for (Task task : tasksToFail) {
-        for (TaskLock lock : taskStorage.getLocks(task.getId())) {
-          taskStorage.removeLock(task.getId(), lock);
+
+          return active ? ScheduledExecutors.Signal.REPEAT : ScheduledExecutors.Signal.STOP;
         }
-      }
-    }
-    finally {
-      giant.unlock();
-    }
+    );
   }
 
   /**
    * Shuts down the queue.
    */
   @LifecycleStop
-  public void stop()
-  {
-    giant.lock();
-
-    try {
-      tasks.clear();
-      taskFutures.clear();
-      active = false;
-      managerExec.shutdownNow();
-      storageSyncExec.shutdownNow();
-      requestManagement();
-    }
-    finally {
-      giant.unlock();
-    }
-  }
-
-  public boolean isActive()
+  public synchronized void stop()
   {
-    return active;
+    active = false;
+    tasks.clear();
+    submittedTaskIds.clear();
+    recentlyCompletedTaskIds.clear();
+    managerExec.shutdownNow();
+    storageSyncExec.shutdownNow();
+    requestManagement();
   }
 
   /**
    * Request management from the management thread. Non-blocking.
-   *
-   * Other callers (such as notifyStatus) should trigger activity on the
-   * TaskQueue thread by requesting management here.
+   * <p>
+   * Callers (such as notifyStatus) can trigger task management by calling
+   * this method.
    */
-  void requestManagement()
+  private void requestManagement()
   {
-    // use a BlockingQueue since the offer/poll/wait behaviour is simple
-    // and very easy to reason about
-
-    // the request has to be offer (non blocking), since someone might request
-    // while already holding giant lock
-
     // do not care if the item fits into the queue:
     // if the queue is already full, request has been triggered anyway
-    managementMayBeNecessary.offer(this);
+    managementRequestQueue.offer(new Object());

Review Comment:
   ## Ignored error status of call
   
   Method requestManagement ignores exceptional return value of BlockingQueue<Object>.offer.
   
   [Show more details](https://github.com/apache/druid/security/code-scanning/4229)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


Re: [PR] Remove `giant` lock from Overlord TaskQueue (druid)

Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] commented on PR #14293:
URL: https://github.com/apache/druid/pull/14293#issuecomment-1937927800

   This pull request has been marked as stale due to 60 days of inactivity.
   It will be closed in 4 weeks if no further activity occurs. If you think
   that's incorrect or this pull request should instead be reviewed, please simply
   write any comment. Even if closed, you can still revive the PR at any time or
   discuss it on the dev@druid.apache.org list.
   Thank you for your contributions.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] github-code-scanning[bot] commented on a diff in pull request #14293: Remove `giant` lock from Overlord TaskQueue

Posted by "github-code-scanning[bot] (via GitHub)" <gi...@apache.org>.
github-code-scanning[bot] commented on code in PR #14293:
URL: https://github.com/apache/druid/pull/14293#discussion_r1195495474


##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java:
##########
@@ -749,131 +651,69 @@
           }
         }
     );
-    return statusFuture;
   }
 
   /**
-   * Resync the contents of this task queue with our storage facility. Useful to make sure our in-memory state
-   * corresponds to the storage facility even if the latter is manually modified.
+   * Resync the contents of this task queue with our storage facility.
+   * Useful to make sure our in-memory state corresponds to the storage facility
+   * even if the latter is manually modified.
+   * <p>
+   * This method must be called only when queue is {@link #active}, except when
+   * starting up.
    */
-  private void syncFromStorage()
+  private synchronized void syncFromStorage()
   {
-    giant.lock();
-
-    try {
-      if (active) {
-        final Map<String, Task> newTasks = toTaskIDMap(taskStorage.getActiveTasks());
-        final int tasksSynced = newTasks.size();
-        final Map<String, Task> oldTasks = new HashMap<>(tasks);
-
-        // Calculate differences on IDs instead of Task Objects.
-        Set<String> commonIds = Sets.newHashSet(Sets.intersection(newTasks.keySet(), oldTasks.keySet()));
-        for (String taskID : commonIds) {
-          newTasks.remove(taskID);
-          oldTasks.remove(taskID);
-        }
-        Collection<Task> addedTasks = newTasks.values();
-        Collection<Task> removedTasks = oldTasks.values();
-
-        // Clean up removed Tasks
-        for (Task task : removedTasks) {
-          removeTaskInternal(task.getId());
-        }
-
-        // Add newly Added tasks to the queue
-        for (Task task : addedTasks) {
-          addTaskInternal(task);
-        }
-
-        log.info(
-            "Synced %d tasks from storage (%d tasks added, %d tasks removed).",
-            tasksSynced,
-            addedTasks.size(),
-            removedTasks.size()
-        );
-        requestManagement();
-      } else {
-        log.info("Not active. Skipping storage sync.");
-      }
-    }
-    catch (Exception e) {
-      log.warn(e, "Failed to sync tasks from storage!");
-      throw new RuntimeException(e);
-    }
-    finally {
-      giant.unlock();
-    }
-  }
-
-  private static Map<String, Task> toTaskIDMap(List<Task> taskList)
-  {
-    Map<String, Task> rv = new HashMap<>();
-    for (Task task : taskList) {
-      rv.put(task.getId(), task);
-    }
-    return rv;
-  }
-
-  private Map<String, Long> getDeltaValues(Map<String, Long> total, Map<String, Long> prev)
-  {
-    return total.entrySet()
-                .stream()
-                .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue() - prev.getOrDefault(e.getKey(), 0L)));
-  }
+    final Map<String, Task> newTasks = taskStorage.getActiveTasks().stream().collect(
+        Collectors.toMap(Task::getId, Function.identity())
+    );
+    final Map<String, Task> oldTasks = new HashMap<>(tasks);
+
+    // Calculate differences on IDs instead of Task Objects.
+    Set<String> commonIds = Sets.intersection(newTasks.keySet(), oldTasks.keySet());
+    for (String taskId : commonIds) {
+      newTasks.remove(taskId);
+      oldTasks.remove(taskId);
+    }
+    Collection<Task> addedTasks = newTasks.values();
+    Collection<Task> removedTasks = oldTasks.values();
+
+    // Add new tasks and clean up removed tasks
+    addedTasks.forEach(this::addTaskInternal);
+    removedTasks.forEach(this::removeTaskInternal);
+    log.info(
+        "Synced [%d] tasks from storage. Added [%d] tasks, removed [%d] tasks.",
+        newTasks.size(), addedTasks.size(), removedTasks.size()
+    );
 
-  public Map<String, Long> getSuccessfulTaskCount()
-  {
-    Map<String, Long> total = CollectionUtils.mapValues(totalSuccessfulTaskCount, AtomicLong::get);
-    synchronized (totalSuccessfulTaskCount) {
-      Map<String, Long> delta = getDeltaValues(total, prevTotalSuccessfulTaskCount);
-      prevTotalSuccessfulTaskCount = total;
-      return delta;
-    }
+    requestManagement();
   }
 
-  public Map<String, Long> getFailedTaskCount()
+  public Map<String, Long> getAndResetSuccessfulTaskCounts()
   {
-    Map<String, Long> total = CollectionUtils.mapValues(totalFailedTaskCount, AtomicLong::get);
-    synchronized (totalFailedTaskCount) {
-      Map<String, Long> delta = getDeltaValues(total, prevTotalFailedTaskCount);
-      prevTotalFailedTaskCount = total;
-      return delta;
-    }
+    Map<String, Long> total = new HashMap<>(datasourceToSuccessfulTaskCount);
+    datasourceToSuccessfulTaskCount.clear();
+    return total;
   }
 
-  Map<String, String> getCurrentTaskDatasources()
+  public Map<String, Long> getAndResetFailedTaskCounts()
   {
-    giant.lock();
-    try {
-      return tasks.values().stream().collect(Collectors.toMap(Task::getId, Task::getDataSource));
-    }
-    finally {
-      giant.unlock();
-    }
+    Map<String, Long> total = new HashMap<>(datasourceToFailedTaskCount);
+    datasourceToFailedTaskCount.clear();
+    return total;
   }
 
   public Map<String, Long> getRunningTaskCount()
   {
-    Map<String, String> taskDatasources = getCurrentTaskDatasources();
-    return taskRunner.getRunningTasks()
-                     .stream()
-                     .collect(Collectors.toMap(
-                         e -> taskDatasources.getOrDefault(e.getTaskId(), ""),
-                         e -> 1L,
-                         Long::sum
-                     ));
+    return taskRunner.getRunningTasks().stream().collect(
+        Collectors.toMap(TaskRunnerWorkItem::getDataSource, task -> 1L, Long::sum)
+    );
   }
 
   public Map<String, Long> getPendingTaskCount()
   {
-    Map<String, String> taskDatasources = getCurrentTaskDatasources();
-    return taskRunner.getPendingTasks()
-                     .stream()
-                     .collect(Collectors.toMap(
-                         e -> taskDatasources.getOrDefault(e.getTaskId(), ""),
-                         e -> 1L,
-                         Long::sum
-                     ));
+    return taskRunner.getPendingTasks().stream().collect(
+        Collectors.toMap(TaskRunnerWorkItem::getDataSource, task -> 1L, Long::sum)

Review Comment:
   ## Useless parameter
   
   The parameter 'task' is never used.
   
   [Show more details](https://github.com/apache/druid/security/code-scanning/4227)



##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java:
##########
@@ -749,131 +651,69 @@
           }
         }
     );
-    return statusFuture;
   }
 
   /**
-   * Resync the contents of this task queue with our storage facility. Useful to make sure our in-memory state
-   * corresponds to the storage facility even if the latter is manually modified.
+   * Resync the contents of this task queue with our storage facility.
+   * Useful to make sure our in-memory state corresponds to the storage facility
+   * even if the latter is manually modified.
+   * <p>
+   * This method must be called only when queue is {@link #active}, except when
+   * starting up.
    */
-  private void syncFromStorage()
+  private synchronized void syncFromStorage()
   {
-    giant.lock();
-
-    try {
-      if (active) {
-        final Map<String, Task> newTasks = toTaskIDMap(taskStorage.getActiveTasks());
-        final int tasksSynced = newTasks.size();
-        final Map<String, Task> oldTasks = new HashMap<>(tasks);
-
-        // Calculate differences on IDs instead of Task Objects.
-        Set<String> commonIds = Sets.newHashSet(Sets.intersection(newTasks.keySet(), oldTasks.keySet()));
-        for (String taskID : commonIds) {
-          newTasks.remove(taskID);
-          oldTasks.remove(taskID);
-        }
-        Collection<Task> addedTasks = newTasks.values();
-        Collection<Task> removedTasks = oldTasks.values();
-
-        // Clean up removed Tasks
-        for (Task task : removedTasks) {
-          removeTaskInternal(task.getId());
-        }
-
-        // Add newly Added tasks to the queue
-        for (Task task : addedTasks) {
-          addTaskInternal(task);
-        }
-
-        log.info(
-            "Synced %d tasks from storage (%d tasks added, %d tasks removed).",
-            tasksSynced,
-            addedTasks.size(),
-            removedTasks.size()
-        );
-        requestManagement();
-      } else {
-        log.info("Not active. Skipping storage sync.");
-      }
-    }
-    catch (Exception e) {
-      log.warn(e, "Failed to sync tasks from storage!");
-      throw new RuntimeException(e);
-    }
-    finally {
-      giant.unlock();
-    }
-  }
-
-  private static Map<String, Task> toTaskIDMap(List<Task> taskList)
-  {
-    Map<String, Task> rv = new HashMap<>();
-    for (Task task : taskList) {
-      rv.put(task.getId(), task);
-    }
-    return rv;
-  }
-
-  private Map<String, Long> getDeltaValues(Map<String, Long> total, Map<String, Long> prev)
-  {
-    return total.entrySet()
-                .stream()
-                .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue() - prev.getOrDefault(e.getKey(), 0L)));
-  }
+    final Map<String, Task> newTasks = taskStorage.getActiveTasks().stream().collect(
+        Collectors.toMap(Task::getId, Function.identity())
+    );
+    final Map<String, Task> oldTasks = new HashMap<>(tasks);
+
+    // Calculate differences on IDs instead of Task Objects.
+    Set<String> commonIds = Sets.intersection(newTasks.keySet(), oldTasks.keySet());
+    for (String taskId : commonIds) {
+      newTasks.remove(taskId);
+      oldTasks.remove(taskId);
+    }
+    Collection<Task> addedTasks = newTasks.values();
+    Collection<Task> removedTasks = oldTasks.values();
+
+    // Add new tasks and clean up removed tasks
+    addedTasks.forEach(this::addTaskInternal);
+    removedTasks.forEach(this::removeTaskInternal);
+    log.info(
+        "Synced [%d] tasks from storage. Added [%d] tasks, removed [%d] tasks.",
+        newTasks.size(), addedTasks.size(), removedTasks.size()
+    );
 
-  public Map<String, Long> getSuccessfulTaskCount()
-  {
-    Map<String, Long> total = CollectionUtils.mapValues(totalSuccessfulTaskCount, AtomicLong::get);
-    synchronized (totalSuccessfulTaskCount) {
-      Map<String, Long> delta = getDeltaValues(total, prevTotalSuccessfulTaskCount);
-      prevTotalSuccessfulTaskCount = total;
-      return delta;
-    }
+    requestManagement();
   }
 
-  public Map<String, Long> getFailedTaskCount()
+  public Map<String, Long> getAndResetSuccessfulTaskCounts()
   {
-    Map<String, Long> total = CollectionUtils.mapValues(totalFailedTaskCount, AtomicLong::get);
-    synchronized (totalFailedTaskCount) {
-      Map<String, Long> delta = getDeltaValues(total, prevTotalFailedTaskCount);
-      prevTotalFailedTaskCount = total;
-      return delta;
-    }
+    Map<String, Long> total = new HashMap<>(datasourceToSuccessfulTaskCount);
+    datasourceToSuccessfulTaskCount.clear();
+    return total;
   }
 
-  Map<String, String> getCurrentTaskDatasources()
+  public Map<String, Long> getAndResetFailedTaskCounts()
   {
-    giant.lock();
-    try {
-      return tasks.values().stream().collect(Collectors.toMap(Task::getId, Task::getDataSource));
-    }
-    finally {
-      giant.unlock();
-    }
+    Map<String, Long> total = new HashMap<>(datasourceToFailedTaskCount);
+    datasourceToFailedTaskCount.clear();
+    return total;
   }
 
   public Map<String, Long> getRunningTaskCount()
   {
-    Map<String, String> taskDatasources = getCurrentTaskDatasources();
-    return taskRunner.getRunningTasks()
-                     .stream()
-                     .collect(Collectors.toMap(
-                         e -> taskDatasources.getOrDefault(e.getTaskId(), ""),
-                         e -> 1L,
-                         Long::sum
-                     ));
+    return taskRunner.getRunningTasks().stream().collect(
+        Collectors.toMap(TaskRunnerWorkItem::getDataSource, task -> 1L, Long::sum)

Review Comment:
   ## Useless parameter
   
   The parameter 'task' is never used.
   
   [Show more details](https://github.com/apache/druid/security/code-scanning/4226)



##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java:
##########
@@ -156,314 +153,248 @@
     this.emitter = Preconditions.checkNotNull(emitter, "emitter");
   }
 
-  @VisibleForTesting
-  void setActive(boolean active)
-  {
-    this.active = active;
-  }
-
   /**
    * Starts this task queue. Allows {@link #add(Task)} to accept new tasks.
    */
   @LifecycleStart
-  public void start()
+  public synchronized void start()
   {
-    giant.lock();
-
-    try {
-      Preconditions.checkState(!active, "queue must be stopped");
-      active = true;
-      syncFromStorage();
-      // Mark these tasks as failed as they could not reacuire the lock
-      // Clean up needs to happen after tasks have been synced from storage
-      Set<Task> tasksToFail = taskLockbox.syncFromStorage().getTasksToFail();
-      for (Task task : tasksToFail) {
-        shutdown(task.getId(),
-                 "Shutting down forcefully as task failed to reacquire lock while becoming leader");
+    Preconditions.checkState(!active, "queue must be stopped");
+
+    // Mark queue as active only after first sync is complete
+    syncFromStorage();
+    active = true;
+
+    // Mark these tasks as failed as they could not reacquire locks
+    Set<Task> tasksToFail = taskLockbox.syncFromStorage().getTasksToFail();
+    for (Task task : tasksToFail) {
+      shutdown(
+          task.getId(),
+          "Shutting down forcefully as task failed to reacquire lock while becoming leader"
+      );
+    }
+    requestManagement();
+    // Remove any unacquired locks from storage (shutdown only clears entries for which a TaskLockPosse was acquired)
+    // This is called after requesting management as locks need to be cleared after notifyStatus is processed
+    for (Task task : tasksToFail) {
+      for (TaskLock lock : taskStorage.getLocks(task.getId())) {
+        taskStorage.removeLock(task.getId(), lock);
       }
-      managerExec.submit(
-          new Runnable()
-          {
-            @Override
-            public void run()
-            {
-              while (true) {
-                try {
-                  manage();
-                  break;
-                }
-                catch (InterruptedException e) {
-                  log.info("Interrupted, exiting!");
-                  break;
-                }
-                catch (Exception e) {
-                  final long restartDelay = config.getRestartDelay().getMillis();
-                  log.makeAlert(e, "Failed to manage").addData("restartDelay", restartDelay).emit();
-                  try {
-                    Thread.sleep(restartDelay);
-                  }
-                  catch (InterruptedException e2) {
-                    log.info("Interrupted, exiting!");
-                    break;
-                  }
-                }
-              }
+    }
+    log.info("Cleaned up [%d] tasks which failed to reacquire locks.", tasksToFail.size());
+
+    // Submit task management job
+    managerExec.submit(
+        () -> {
+          log.info("Beginning task management in [%s].", config.getStartDelay());
+          long startDelayMillis = config.getStartDelay().getMillis();
+          while (active) {
+            try {
+              Thread.sleep(startDelayMillis);
+              runTaskManagement();
+            }
+            catch (InterruptedException e) {
+              log.info("Interrupted, stopping task management.");
+              break;
+            }
+            catch (Exception e) {
+              startDelayMillis = config.getRestartDelay().getMillis();
+              log.makeAlert(e, "Failed to manage").addData("restartDelay", startDelayMillis).emit();
             }
           }
-      );
-      ScheduledExecutors.scheduleAtFixedRate(
-          storageSyncExec,
-          config.getStorageSyncRate(),
-          new Callable<ScheduledExecutors.Signal>()
-          {
-            @Override
-            public ScheduledExecutors.Signal call()
-            {
-              try {
-                syncFromStorage();
-              }
-              catch (Exception e) {
-                if (active) {
-                  log.makeAlert(e, "Failed to sync with storage").emit();
-                }
-              }
-              if (active) {
-                return ScheduledExecutors.Signal.REPEAT;
-              } else {
-                return ScheduledExecutors.Signal.STOP;
-              }
+        }
+    );
+
+    // Schedule storage sync job
+    ScheduledExecutors.scheduleAtFixedRate(
+        storageSyncExec,
+        config.getStorageSyncRate(),
+        () -> {
+          if (!active) {
+            log.info("Stopping storage sync as TaskQueue has been stopped");
+            return ScheduledExecutors.Signal.STOP;
+          }
+
+          try {
+            syncFromStorage();
+          }
+          catch (Exception e) {
+            if (active) {
+              log.makeAlert(e, "Failed to sync with storage").emit();
             }
           }
-      );
-      requestManagement();
-      // Remove any unacquired locks from storage (shutdown only clears entries for which a TaskLockPosse was acquired)
-      // This is called after requesting management as locks need to be cleared after notifyStatus is processed
-      for (Task task : tasksToFail) {
-        for (TaskLock lock : taskStorage.getLocks(task.getId())) {
-          taskStorage.removeLock(task.getId(), lock);
+
+          return active ? ScheduledExecutors.Signal.REPEAT : ScheduledExecutors.Signal.STOP;
         }
-      }
-    }
-    finally {
-      giant.unlock();
-    }
+    );
   }
 
   /**
    * Shuts down the queue.
    */
   @LifecycleStop
-  public void stop()
+  public synchronized void stop()
   {
-    giant.lock();
-
-    try {
-      tasks.clear();
-      taskFutures.clear();
-      active = false;
-      managerExec.shutdownNow();
-      storageSyncExec.shutdownNow();
-      requestManagement();
-    }
-    finally {
-      giant.unlock();
-    }
-  }
-
-  public boolean isActive()
-  {
-    return active;
+    active = false;
+    tasks.clear();
+    submittedTaskIds.clear();
+    recentlyCompletedTaskIds.clear();
+    managerExec.shutdownNow();
+    storageSyncExec.shutdownNow();
+    requestManagement();
   }
 
   /**
    * Request management from the management thread. Non-blocking.
-   *
-   * Other callers (such as notifyStatus) should trigger activity on the
-   * TaskQueue thread by requesting management here.
+   * <p>
+   * Callers (such as notifyStatus) can trigger task management by calling
+   * this method.
    */
-  void requestManagement()
+  private void requestManagement()
   {
-    // use a BlockingQueue since the offer/poll/wait behaviour is simple
-    // and very easy to reason about
-
-    // the request has to be offer (non blocking), since someone might request
-    // while already holding giant lock
-
-    // do not care if the item fits into the queue:
-    // if the queue is already full, request has been triggered anyway
-    managementMayBeNecessary.offer(this);
+    synchronized (managementRequested) {
+      managementRequested.set(true);
+      managementRequested.notify();

Review Comment:
   ## notify instead of notifyAll
   
   Using notify rather than notifyAll.
   
   [Show more details](https://github.com/apache/druid/security/code-scanning/4228)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] github-code-scanning[bot] commented on a diff in pull request #14293: Remove `giant` lock from Overlord TaskQueue

Posted by "github-code-scanning[bot] (via GitHub)" <gi...@apache.org>.
github-code-scanning[bot] commented on code in PR #14293:
URL: https://github.com/apache/druid/pull/14293#discussion_r1195917921


##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java:
##########
@@ -156,314 +155,245 @@
     this.emitter = Preconditions.checkNotNull(emitter, "emitter");
   }
 
-  @VisibleForTesting
-  void setActive(boolean active)
-  {
-    this.active = active;
-  }
-
   /**
    * Starts this task queue. Allows {@link #add(Task)} to accept new tasks.
    */
   @LifecycleStart
-  public void start()
+  public synchronized void start()
   {
-    giant.lock();
-
-    try {
-      Preconditions.checkState(!active, "queue must be stopped");
-      active = true;
-      syncFromStorage();
-      // Mark these tasks as failed as they could not reacuire the lock
-      // Clean up needs to happen after tasks have been synced from storage
-      Set<Task> tasksToFail = taskLockbox.syncFromStorage().getTasksToFail();
-      for (Task task : tasksToFail) {
-        shutdown(task.getId(),
-                 "Shutting down forcefully as task failed to reacquire lock while becoming leader");
+    Preconditions.checkState(!active, "queue must be stopped");
+
+    // Mark queue as active only after first sync is complete
+    syncFromStorage();
+    active = true;
+
+    // Mark these tasks as failed as they could not reacquire locks
+    Set<Task> tasksToFail = taskLockbox.syncFromStorage().getTasksToFail();
+    for (Task task : tasksToFail) {
+      shutdown(
+          task.getId(),
+          "Shutting down forcefully as task failed to reacquire lock while becoming leader"
+      );
+    }
+    requestManagement();
+    // Remove any unacquired locks from storage (shutdown only clears entries for which a TaskLockPosse was acquired)
+    // This is called after requesting management as locks need to be cleared after notifyStatus is processed
+    for (Task task : tasksToFail) {
+      for (TaskLock lock : taskStorage.getLocks(task.getId())) {
+        taskStorage.removeLock(task.getId(), lock);
       }
-      managerExec.submit(
-          new Runnable()
-          {
-            @Override
-            public void run()
-            {
-              while (true) {
-                try {
-                  manage();
-                  break;
-                }
-                catch (InterruptedException e) {
-                  log.info("Interrupted, exiting!");
-                  break;
-                }
-                catch (Exception e) {
-                  final long restartDelay = config.getRestartDelay().getMillis();
-                  log.makeAlert(e, "Failed to manage").addData("restartDelay", restartDelay).emit();
-                  try {
-                    Thread.sleep(restartDelay);
-                  }
-                  catch (InterruptedException e2) {
-                    log.info("Interrupted, exiting!");
-                    break;
-                  }
-                }
-              }
+    }
+    log.info("Cleaned up [%d] tasks which failed to reacquire locks.", tasksToFail.size());
+
+    // Submit task management job
+    managerExec.submit(
+        () -> {
+          log.info("Beginning task management in [%s].", config.getStartDelay());
+          long startDelayMillis = config.getStartDelay().getMillis();
+          while (active) {
+            try {
+              Thread.sleep(startDelayMillis);
+              runTaskManagement();
+            }
+            catch (InterruptedException e) {
+              log.info("Interrupted, stopping task management.");
+              break;
+            }
+            catch (Exception e) {
+              startDelayMillis = config.getRestartDelay().getMillis();
+              log.makeAlert(e, "Failed to manage").addData("restartDelay", startDelayMillis).emit();
             }
           }
-      );
-      ScheduledExecutors.scheduleAtFixedRate(
-          storageSyncExec,
-          config.getStorageSyncRate(),
-          new Callable<ScheduledExecutors.Signal>()
-          {
-            @Override
-            public ScheduledExecutors.Signal call()
-            {
-              try {
-                syncFromStorage();
-              }
-              catch (Exception e) {
-                if (active) {
-                  log.makeAlert(e, "Failed to sync with storage").emit();
-                }
-              }
-              if (active) {
-                return ScheduledExecutors.Signal.REPEAT;
-              } else {
-                return ScheduledExecutors.Signal.STOP;
-              }
+        }
+    );
+
+    // Schedule storage sync job
+    ScheduledExecutors.scheduleAtFixedRate(
+        storageSyncExec,
+        config.getStorageSyncRate(),
+        () -> {
+          if (!active) {
+            log.info("Stopping storage sync as TaskQueue has been stopped");
+            return ScheduledExecutors.Signal.STOP;
+          }
+
+          try {
+            syncFromStorage();
+          }
+          catch (Exception e) {
+            if (active) {
+              log.makeAlert(e, "Failed to sync with storage").emit();
             }
           }
-      );
-      requestManagement();
-      // Remove any unacquired locks from storage (shutdown only clears entries for which a TaskLockPosse was acquired)
-      // This is called after requesting management as locks need to be cleared after notifyStatus is processed
-      for (Task task : tasksToFail) {
-        for (TaskLock lock : taskStorage.getLocks(task.getId())) {
-          taskStorage.removeLock(task.getId(), lock);
+
+          return active ? ScheduledExecutors.Signal.REPEAT : ScheduledExecutors.Signal.STOP;
         }
-      }
-    }
-    finally {
-      giant.unlock();
-    }
+    );
   }
 
   /**
    * Shuts down the queue.
    */
   @LifecycleStop
-  public void stop()
-  {
-    giant.lock();
-
-    try {
-      tasks.clear();
-      taskFutures.clear();
-      active = false;
-      managerExec.shutdownNow();
-      storageSyncExec.shutdownNow();
-      requestManagement();
-    }
-    finally {
-      giant.unlock();
-    }
-  }
-
-  public boolean isActive()
+  public synchronized void stop()
   {
-    return active;
+    active = false;
+    tasks.clear();
+    submittedTaskIds.clear();
+    recentlyCompletedTaskIds.clear();
+    managerExec.shutdownNow();
+    storageSyncExec.shutdownNow();
+    requestManagement();
   }
 
   /**
    * Request management from the management thread. Non-blocking.
-   *
-   * Other callers (such as notifyStatus) should trigger activity on the
-   * TaskQueue thread by requesting management here.
+   * <p>
+   * Callers (such as notifyStatus) can trigger task management by calling
+   * this method.
    */
-  void requestManagement()
+  private void requestManagement()
   {
-    // use a BlockingQueue since the offer/poll/wait behaviour is simple
-    // and very easy to reason about
-
-    // the request has to be offer (non blocking), since someone might request
-    // while already holding giant lock
-
     // do not care if the item fits into the queue:
     // if the queue is already full, request has been triggered anyway
-    managementMayBeNecessary.offer(this);
+    managementRequestQueue.offer(new Object());

Review Comment:
   ## Ignored error status of call
   
   Method requestManagement ignores exceptional return value of BlockingQueue<Object>.offer.
   
   [Show more details](https://github.com/apache/druid/security/code-scanning/4971)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] kfaraz commented on a diff in pull request #14293: Remove `giant` lock from Overlord TaskQueue

Posted by "kfaraz (via GitHub)" <gi...@apache.org>.
kfaraz commented on code in PR #14293:
URL: https://github.com/apache/druid/pull/14293#discussion_r1195854805


##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java:
##########
@@ -488,58 +426,56 @@ public boolean add(final Task task) throws EntryExistsException
     // Set forceTimeChunkLock before adding task spec to taskStorage, so that we can see always consistent task spec.
     task.addToContextIfAbsent(Tasks.FORCE_TIME_CHUNK_LOCK_KEY, lockConfig.isForceTimeChunkLock());
     defaultTaskConfig.getContext().forEach(task::addToContextIfAbsent);
-    // Every task shuold use the lineage-based segment allocation protocol unless it is explicitly set to
+    // Every task should use the lineage-based segment allocation protocol unless it is explicitly set to
     // using the legacy protocol.
     task.addToContextIfAbsent(
         SinglePhaseParallelIndexTaskRunner.CTX_USE_LINEAGE_BASED_SEGMENT_ALLOCATION_KEY,
         SinglePhaseParallelIndexTaskRunner.DEFAULT_USE_LINEAGE_BASED_SEGMENT_ALLOCATION
     );
 
-    giant.lock();
-
-    try {
-      Preconditions.checkState(active, "Queue is not active!");
-      Preconditions.checkNotNull(task, "task");
-      Preconditions.checkState(tasks.size() < config.getMaxSize(), "Too many tasks (max = %,d)", config.getMaxSize());
-
-      // If this throws with any sort of exception, including TaskExistsException, we don't want to
-      // insert the task into our queue. So don't catch it.
-      taskStorage.insert(task, TaskStatus.running(task.getId()));
-      addTaskInternal(task);
-      requestManagement();
-      return true;
-    }
-    finally {
-      giant.unlock();
-    }
+    // Do not add the task to queue if insert into metadata fails for any reason
+    taskStorage.insert(task, TaskStatus.running(task.getId()));
+    addTaskInternal(task);
+    requestManagement();
+    return true;
   }
 
-  @GuardedBy("giant")
+  /**
+   * Atomically adds this task to the TaskQueue.
+   */
   private void addTaskInternal(final Task task)
   {
-    final Task existingTask = tasks.putIfAbsent(task.getId(), task);
-
-    if (existingTask == null) {
-      taskLockbox.add(task);
-    } else if (!existingTask.equals(task)) {
-      throw new ISE("Cannot add task ID [%s] with same ID as task that has already been added", task.getId());
-    }
+    tasks.computeIfAbsent(
+        task.getId(),
+        taskId -> {

Review Comment:
   Yeah, it is intentional. I will add a comment calling out why it must be a ConcurrentHashMap.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] kfaraz commented on a diff in pull request #14293: Remove `giant` lock from Overlord TaskQueue

Posted by "kfaraz (via GitHub)" <gi...@apache.org>.
kfaraz commented on code in PR #14293:
URL: https://github.com/apache/druid/pull/14293#discussion_r1195854164


##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java:
##########
@@ -156,314 +153,248 @@ public TaskQueue(
     this.emitter = Preconditions.checkNotNull(emitter, "emitter");
   }
 
-  @VisibleForTesting
-  void setActive(boolean active)
-  {
-    this.active = active;
-  }
-
   /**
    * Starts this task queue. Allows {@link #add(Task)} to accept new tasks.
    */
   @LifecycleStart
-  public void start()
+  public synchronized void start()
   {
-    giant.lock();
-
-    try {
-      Preconditions.checkState(!active, "queue must be stopped");
-      active = true;
-      syncFromStorage();
-      // Mark these tasks as failed as they could not reacuire the lock
-      // Clean up needs to happen after tasks have been synced from storage
-      Set<Task> tasksToFail = taskLockbox.syncFromStorage().getTasksToFail();
-      for (Task task : tasksToFail) {
-        shutdown(task.getId(),
-                 "Shutting down forcefully as task failed to reacquire lock while becoming leader");
+    Preconditions.checkState(!active, "queue must be stopped");
+
+    // Mark queue as active only after first sync is complete
+    syncFromStorage();
+    active = true;
+
+    // Mark these tasks as failed as they could not reacquire locks
+    Set<Task> tasksToFail = taskLockbox.syncFromStorage().getTasksToFail();
+    for (Task task : tasksToFail) {
+      shutdown(
+          task.getId(),
+          "Shutting down forcefully as task failed to reacquire lock while becoming leader"
+      );
+    }
+    requestManagement();
+    // Remove any unacquired locks from storage (shutdown only clears entries for which a TaskLockPosse was acquired)
+    // This is called after requesting management as locks need to be cleared after notifyStatus is processed
+    for (Task task : tasksToFail) {
+      for (TaskLock lock : taskStorage.getLocks(task.getId())) {
+        taskStorage.removeLock(task.getId(), lock);
       }
-      managerExec.submit(
-          new Runnable()
-          {
-            @Override
-            public void run()
-            {
-              while (true) {
-                try {
-                  manage();
-                  break;
-                }
-                catch (InterruptedException e) {
-                  log.info("Interrupted, exiting!");
-                  break;
-                }
-                catch (Exception e) {
-                  final long restartDelay = config.getRestartDelay().getMillis();
-                  log.makeAlert(e, "Failed to manage").addData("restartDelay", restartDelay).emit();
-                  try {
-                    Thread.sleep(restartDelay);
-                  }
-                  catch (InterruptedException e2) {
-                    log.info("Interrupted, exiting!");
-                    break;
-                  }
-                }
-              }
+    }
+    log.info("Cleaned up [%d] tasks which failed to reacquire locks.", tasksToFail.size());
+
+    // Submit task management job
+    managerExec.submit(
+        () -> {
+          log.info("Beginning task management in [%s].", config.getStartDelay());
+          long startDelayMillis = config.getStartDelay().getMillis();
+          while (active) {
+            try {
+              Thread.sleep(startDelayMillis);
+              runTaskManagement();
+            }
+            catch (InterruptedException e) {
+              log.info("Interrupted, stopping task management.");
+              break;
+            }
+            catch (Exception e) {
+              startDelayMillis = config.getRestartDelay().getMillis();
+              log.makeAlert(e, "Failed to manage").addData("restartDelay", startDelayMillis).emit();
             }
           }
-      );
-      ScheduledExecutors.scheduleAtFixedRate(
-          storageSyncExec,
-          config.getStorageSyncRate(),
-          new Callable<ScheduledExecutors.Signal>()
-          {
-            @Override
-            public ScheduledExecutors.Signal call()
-            {
-              try {
-                syncFromStorage();
-              }
-              catch (Exception e) {
-                if (active) {
-                  log.makeAlert(e, "Failed to sync with storage").emit();
-                }
-              }
-              if (active) {
-                return ScheduledExecutors.Signal.REPEAT;
-              } else {
-                return ScheduledExecutors.Signal.STOP;
-              }
+        }
+    );
+
+    // Schedule storage sync job
+    ScheduledExecutors.scheduleAtFixedRate(
+        storageSyncExec,
+        config.getStorageSyncRate(),
+        () -> {
+          if (!active) {
+            log.info("Stopping storage sync as TaskQueue has been stopped");
+            return ScheduledExecutors.Signal.STOP;
+          }
+
+          try {
+            syncFromStorage();
+          }
+          catch (Exception e) {
+            if (active) {
+              log.makeAlert(e, "Failed to sync with storage").emit();
             }
           }
-      );
-      requestManagement();
-      // Remove any unacquired locks from storage (shutdown only clears entries for which a TaskLockPosse was acquired)
-      // This is called after requesting management as locks need to be cleared after notifyStatus is processed
-      for (Task task : tasksToFail) {
-        for (TaskLock lock : taskStorage.getLocks(task.getId())) {
-          taskStorage.removeLock(task.getId(), lock);
+
+          return active ? ScheduledExecutors.Signal.REPEAT : ScheduledExecutors.Signal.STOP;
         }
-      }
-    }
-    finally {
-      giant.unlock();
-    }
+    );
   }
 
   /**
    * Shuts down the queue.
    */
   @LifecycleStop
-  public void stop()
+  public synchronized void stop()
   {
-    giant.lock();
-
-    try {
-      tasks.clear();
-      taskFutures.clear();
-      active = false;
-      managerExec.shutdownNow();
-      storageSyncExec.shutdownNow();
-      requestManagement();
-    }
-    finally {
-      giant.unlock();
-    }
-  }
-
-  public boolean isActive()
-  {
-    return active;
+    active = false;
+    tasks.clear();
+    submittedTaskIds.clear();
+    recentlyCompletedTaskIds.clear();
+    managerExec.shutdownNow();
+    storageSyncExec.shutdownNow();
+    requestManagement();
   }
 
   /**
    * Request management from the management thread. Non-blocking.
-   *
-   * Other callers (such as notifyStatus) should trigger activity on the
-   * TaskQueue thread by requesting management here.
+   * <p>
+   * Callers (such as notifyStatus) can trigger task management by calling
+   * this method.
    */
-  void requestManagement()
+  private void requestManagement()
   {
-    // use a BlockingQueue since the offer/poll/wait behaviour is simple
-    // and very easy to reason about
-
-    // the request has to be offer (non blocking), since someone might request
-    // while already holding giant lock
-
-    // do not care if the item fits into the queue:
-    // if the queue is already full, request has been triggered anyway
-    managementMayBeNecessary.offer(this);
+    synchronized (managementRequested) {
+      managementRequested.set(true);
+      managementRequested.notify();
+    }
   }
 
   /**
-   * Await for an event to manage.
-   *
-   * This should only be called from the management thread to wait for activity.
+   * Waits for a management request to be triggered by another thread.
    *
-   * @param nanos
-   * @throws InterruptedException
+   * @throws InterruptedException if the thread is interrupted while waiting.
    */
-  @SuppressFBWarnings(value = "RV_RETURN_VALUE_IGNORED", justification = "using queue as notification mechanism, result has no value")
-  void awaitManagementNanos(long nanos) throws InterruptedException
+  private void awaitManagementRequest() throws InterruptedException
   {
     // mitigate a busy loop, it can get pretty busy when there are a lot of start/stops
     try {
-      Thread.sleep(MIN_WAIT_TIME_MS);
+      Thread.sleep(MIN_WAIT_TIME_MILLIS);
     }
     catch (InterruptedException e) {
       throw new RuntimeException(e);
     }
 
-    // wait for an item, if an item arrives (or is already available), complete immediately
-    // (does not actually matter what the item is)
-    managementMayBeNecessary.poll(nanos - (TimeUnit.MILLISECONDS.toNanos(MIN_WAIT_TIME_MS)), TimeUnit.NANOSECONDS);
-
-    // there may have been multiple requests, clear them all
-    managementMayBeNecessary.clear();
+    // Wait for management to be requested
+    synchronized (managementRequested) {
+      while (!managementRequested.get()) {
+        managementRequested.wait(MANAGEMENT_WAIT_TIMEOUT_MILLIS - MIN_WAIT_TIME_MILLIS);
+      }
+      managementRequested.compareAndSet(true, false);

Review Comment:
   Yeah, you are right. I will update it, will just use a simple `set()`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] kfaraz commented on a diff in pull request #14293: Remove `giant` lock from Overlord TaskQueue

Posted by "kfaraz (via GitHub)" <gi...@apache.org>.
kfaraz commented on code in PR #14293:
URL: https://github.com/apache/druid/pull/14293#discussion_r1195856085


##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java:
##########
@@ -749,131 +651,69 @@ private void handleStatus(final TaskStatus status)
           }
         }
     );
-    return statusFuture;
   }
 
   /**
-   * Resync the contents of this task queue with our storage facility. Useful to make sure our in-memory state
-   * corresponds to the storage facility even if the latter is manually modified.
+   * Resync the contents of this task queue with our storage facility.
+   * Useful to make sure our in-memory state corresponds to the storage facility
+   * even if the latter is manually modified.
+   * <p>
+   * This method must be called only when queue is {@link #active}, except when
+   * starting up.
    */
-  private void syncFromStorage()
+  private synchronized void syncFromStorage()
   {
-    giant.lock();
-
-    try {
-      if (active) {
-        final Map<String, Task> newTasks = toTaskIDMap(taskStorage.getActiveTasks());
-        final int tasksSynced = newTasks.size();
-        final Map<String, Task> oldTasks = new HashMap<>(tasks);
-
-        // Calculate differences on IDs instead of Task Objects.
-        Set<String> commonIds = Sets.newHashSet(Sets.intersection(newTasks.keySet(), oldTasks.keySet()));
-        for (String taskID : commonIds) {
-          newTasks.remove(taskID);
-          oldTasks.remove(taskID);
-        }
-        Collection<Task> addedTasks = newTasks.values();
-        Collection<Task> removedTasks = oldTasks.values();
-
-        // Clean up removed Tasks
-        for (Task task : removedTasks) {
-          removeTaskInternal(task.getId());
-        }
-
-        // Add newly Added tasks to the queue
-        for (Task task : addedTasks) {
-          addTaskInternal(task);
-        }
-
-        log.info(
-            "Synced %d tasks from storage (%d tasks added, %d tasks removed).",
-            tasksSynced,
-            addedTasks.size(),
-            removedTasks.size()
-        );
-        requestManagement();
-      } else {
-        log.info("Not active. Skipping storage sync.");
-      }
-    }
-    catch (Exception e) {
-      log.warn(e, "Failed to sync tasks from storage!");
-      throw new RuntimeException(e);
-    }
-    finally {
-      giant.unlock();
-    }
-  }
-
-  private static Map<String, Task> toTaskIDMap(List<Task> taskList)
-  {
-    Map<String, Task> rv = new HashMap<>();
-    for (Task task : taskList) {
-      rv.put(task.getId(), task);
-    }
-    return rv;
-  }
-
-  private Map<String, Long> getDeltaValues(Map<String, Long> total, Map<String, Long> prev)
-  {
-    return total.entrySet()
-                .stream()
-                .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue() - prev.getOrDefault(e.getKey(), 0L)));
-  }
+    final Map<String, Task> newTasks = taskStorage.getActiveTasks().stream().collect(
+        Collectors.toMap(Task::getId, Function.identity())
+    );
+    final Map<String, Task> oldTasks = new HashMap<>(tasks);
+
+    // Calculate differences on IDs instead of Task Objects.
+    Set<String> commonIds = Sets.intersection(newTasks.keySet(), oldTasks.keySet());
+    for (String taskId : commonIds) {
+      newTasks.remove(taskId);
+      oldTasks.remove(taskId);
+    }
+    Collection<Task> addedTasks = newTasks.values();
+    Collection<Task> removedTasks = oldTasks.values();
+
+    // Add new tasks and clean up removed tasks
+    addedTasks.forEach(this::addTaskInternal);
+    removedTasks.forEach(this::removeTaskInternal);
+    log.info(

Review Comment:
   I don't think the footprint of this call would really make much of a difference compared to the earlier operations, fetching from DB and map manipulation.
   
   But yeah, you never really know with logging. I suppose I could just return the counts from this method and log the values in the caller itself.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] kfaraz commented on a diff in pull request #14293: Remove `giant` lock from Overlord TaskQueue

Posted by "kfaraz (via GitHub)" <gi...@apache.org>.
kfaraz commented on code in PR #14293:
URL: https://github.com/apache/druid/pull/14293#discussion_r1195885318


##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java:
##########
@@ -156,314 +153,248 @@ public TaskQueue(
     this.emitter = Preconditions.checkNotNull(emitter, "emitter");
   }
 
-  @VisibleForTesting
-  void setActive(boolean active)
-  {
-    this.active = active;
-  }
-
   /**
    * Starts this task queue. Allows {@link #add(Task)} to accept new tasks.
    */
   @LifecycleStart
-  public void start()
+  public synchronized void start()
   {
-    giant.lock();
-
-    try {
-      Preconditions.checkState(!active, "queue must be stopped");
-      active = true;
-      syncFromStorage();
-      // Mark these tasks as failed as they could not reacuire the lock
-      // Clean up needs to happen after tasks have been synced from storage
-      Set<Task> tasksToFail = taskLockbox.syncFromStorage().getTasksToFail();
-      for (Task task : tasksToFail) {
-        shutdown(task.getId(),
-                 "Shutting down forcefully as task failed to reacquire lock while becoming leader");
+    Preconditions.checkState(!active, "queue must be stopped");
+
+    // Mark queue as active only after first sync is complete
+    syncFromStorage();
+    active = true;
+
+    // Mark these tasks as failed as they could not reacquire locks
+    Set<Task> tasksToFail = taskLockbox.syncFromStorage().getTasksToFail();
+    for (Task task : tasksToFail) {
+      shutdown(
+          task.getId(),
+          "Shutting down forcefully as task failed to reacquire lock while becoming leader"
+      );
+    }
+    requestManagement();
+    // Remove any unacquired locks from storage (shutdown only clears entries for which a TaskLockPosse was acquired)
+    // This is called after requesting management as locks need to be cleared after notifyStatus is processed
+    for (Task task : tasksToFail) {
+      for (TaskLock lock : taskStorage.getLocks(task.getId())) {
+        taskStorage.removeLock(task.getId(), lock);
       }
-      managerExec.submit(
-          new Runnable()
-          {
-            @Override
-            public void run()
-            {
-              while (true) {
-                try {
-                  manage();
-                  break;
-                }
-                catch (InterruptedException e) {
-                  log.info("Interrupted, exiting!");
-                  break;
-                }
-                catch (Exception e) {
-                  final long restartDelay = config.getRestartDelay().getMillis();
-                  log.makeAlert(e, "Failed to manage").addData("restartDelay", restartDelay).emit();
-                  try {
-                    Thread.sleep(restartDelay);
-                  }
-                  catch (InterruptedException e2) {
-                    log.info("Interrupted, exiting!");
-                    break;
-                  }
-                }
-              }
+    }
+    log.info("Cleaned up [%d] tasks which failed to reacquire locks.", tasksToFail.size());
+
+    // Submit task management job
+    managerExec.submit(
+        () -> {
+          log.info("Beginning task management in [%s].", config.getStartDelay());
+          long startDelayMillis = config.getStartDelay().getMillis();
+          while (active) {
+            try {
+              Thread.sleep(startDelayMillis);
+              runTaskManagement();
+            }
+            catch (InterruptedException e) {
+              log.info("Interrupted, stopping task management.");
+              break;
+            }
+            catch (Exception e) {
+              startDelayMillis = config.getRestartDelay().getMillis();
+              log.makeAlert(e, "Failed to manage").addData("restartDelay", startDelayMillis).emit();
             }
           }
-      );
-      ScheduledExecutors.scheduleAtFixedRate(
-          storageSyncExec,
-          config.getStorageSyncRate(),
-          new Callable<ScheduledExecutors.Signal>()
-          {
-            @Override
-            public ScheduledExecutors.Signal call()
-            {
-              try {
-                syncFromStorage();
-              }
-              catch (Exception e) {
-                if (active) {
-                  log.makeAlert(e, "Failed to sync with storage").emit();
-                }
-              }
-              if (active) {
-                return ScheduledExecutors.Signal.REPEAT;
-              } else {
-                return ScheduledExecutors.Signal.STOP;
-              }
+        }
+    );
+
+    // Schedule storage sync job
+    ScheduledExecutors.scheduleAtFixedRate(
+        storageSyncExec,
+        config.getStorageSyncRate(),
+        () -> {
+          if (!active) {
+            log.info("Stopping storage sync as TaskQueue has been stopped");
+            return ScheduledExecutors.Signal.STOP;
+          }
+
+          try {
+            syncFromStorage();
+          }
+          catch (Exception e) {
+            if (active) {
+              log.makeAlert(e, "Failed to sync with storage").emit();
             }
           }
-      );
-      requestManagement();
-      // Remove any unacquired locks from storage (shutdown only clears entries for which a TaskLockPosse was acquired)
-      // This is called after requesting management as locks need to be cleared after notifyStatus is processed
-      for (Task task : tasksToFail) {
-        for (TaskLock lock : taskStorage.getLocks(task.getId())) {
-          taskStorage.removeLock(task.getId(), lock);
+
+          return active ? ScheduledExecutors.Signal.REPEAT : ScheduledExecutors.Signal.STOP;
         }
-      }
-    }
-    finally {
-      giant.unlock();
-    }
+    );
   }
 
   /**
    * Shuts down the queue.
    */
   @LifecycleStop
-  public void stop()
+  public synchronized void stop()
   {
-    giant.lock();
-
-    try {
-      tasks.clear();
-      taskFutures.clear();
-      active = false;
-      managerExec.shutdownNow();
-      storageSyncExec.shutdownNow();
-      requestManagement();
-    }
-    finally {
-      giant.unlock();
-    }
-  }
-
-  public boolean isActive()
-  {
-    return active;
+    active = false;
+    tasks.clear();
+    submittedTaskIds.clear();
+    recentlyCompletedTaskIds.clear();
+    managerExec.shutdownNow();
+    storageSyncExec.shutdownNow();
+    requestManagement();
   }
 
   /**
    * Request management from the management thread. Non-blocking.
-   *
-   * Other callers (such as notifyStatus) should trigger activity on the
-   * TaskQueue thread by requesting management here.
+   * <p>
+   * Callers (such as notifyStatus) can trigger task management by calling
+   * this method.
    */
-  void requestManagement()
+  private void requestManagement()
   {
-    // use a BlockingQueue since the offer/poll/wait behaviour is simple
-    // and very easy to reason about
-
-    // the request has to be offer (non blocking), since someone might request
-    // while already holding giant lock
-
-    // do not care if the item fits into the queue:
-    // if the queue is already full, request has been triggered anyway
-    managementMayBeNecessary.offer(this);
+    synchronized (managementRequested) {
+      managementRequested.set(true);
+      managementRequested.notify();
+    }
   }
 
   /**
-   * Await for an event to manage.
-   *
-   * This should only be called from the management thread to wait for activity.
+   * Waits for a management request to be triggered by another thread.
    *
-   * @param nanos
-   * @throws InterruptedException
+   * @throws InterruptedException if the thread is interrupted while waiting.
    */
-  @SuppressFBWarnings(value = "RV_RETURN_VALUE_IGNORED", justification = "using queue as notification mechanism, result has no value")
-  void awaitManagementNanos(long nanos) throws InterruptedException
+  private void awaitManagementRequest() throws InterruptedException
   {
     // mitigate a busy loop, it can get pretty busy when there are a lot of start/stops
     try {
-      Thread.sleep(MIN_WAIT_TIME_MS);
+      Thread.sleep(MIN_WAIT_TIME_MILLIS);
     }
     catch (InterruptedException e) {
       throw new RuntimeException(e);
     }
 
-    // wait for an item, if an item arrives (or is already available), complete immediately
-    // (does not actually matter what the item is)
-    managementMayBeNecessary.poll(nanos - (TimeUnit.MILLISECONDS.toNanos(MIN_WAIT_TIME_MS)), TimeUnit.NANOSECONDS);
-
-    // there may have been multiple requests, clear them all
-    managementMayBeNecessary.clear();
+    // Wait for management to be requested
+    synchronized (managementRequested) {
+      while (!managementRequested.get()) {
+        managementRequested.wait(MANAGEMENT_WAIT_TIMEOUT_MILLIS - MIN_WAIT_TIME_MILLIS);
+      }
+      managementRequested.compareAndSet(true, false);
+    }
   }
 
   /**
    * Main task runner management loop. Meant to run forever, or, at least until we're stopped.
    */
-  private void manage() throws InterruptedException
+  private void runTaskManagement() throws InterruptedException
   {
-    log.info("Beginning management in %s.", config.getStartDelay());
-    Thread.sleep(config.getStartDelay().getMillis());
-
     // Ignore return value- we'll get the IDs and futures from getKnownTasks later.
     taskRunner.restore();
 
     while (active) {
-      manageInternal();
-
-      // awaitNanos because management may become necessary without this condition signalling,
-      // due to e.g. tasks becoming ready when other folks mess with the TaskLockbox.
-      awaitManagementNanos(MANAGEMENT_WAIT_TIMEOUT_NANOS);
+      manageTasks();
+      awaitManagementRequest();
     }
   }
 
   @VisibleForTesting
-  void manageInternal()
+  void manageTasks()
   {
-    Set<String> knownTaskIds = new HashSet<>();
-    Map<String, ListenableFuture<TaskStatus>> runnerTaskFutures = new HashMap<>();
-
-    giant.lock();
-
-    try {
-      manageInternalCritical(knownTaskIds, runnerTaskFutures);
-    }
-    finally {
-      giant.unlock();
-    }
-
-    manageInternalPostCritical(knownTaskIds, runnerTaskFutures);
+    runReadyTasks();
+    killUnknownTasks();
   }
 
 
   /**
-   * Management loop critical section tasks.
-   *
-   * @param knownTaskIds will be modified - filled with known task IDs
-   * @param runnerTaskFutures will be modified - filled with futures related to getting the running tasks
+   * Submits ready tasks to the TaskRunner.
+   * <p>
+   * This method should be called only by the management thread.
    */
-  @GuardedBy("giant")
-  private void manageInternalCritical(
-      final Set<String> knownTaskIds,
-      final Map<String, ListenableFuture<TaskStatus>> runnerTaskFutures
-  )
+  private synchronized void runReadyTasks()
   {
     // Task futures available from the taskRunner
+    final Map<String, ListenableFuture<TaskStatus>> runnerTaskFutures = new HashMap<>();
     for (final TaskRunnerWorkItem workItem : taskRunner.getKnownTasks()) {
-      if (!recentlyCompletedTasks.contains(workItem.getTaskId())) {
-        // Don't do anything with tasks that have recently finished; notifyStatus will handle it.
+      if (!recentlyCompletedTaskIds.contains(workItem.getTaskId())) {
+        // Don't do anything with recently completed tasks; notifyStatus will handle it.
         runnerTaskFutures.put(workItem.getTaskId(), workItem.getResult());
       }
     }
+
     // Attain futures for all active tasks (assuming they are ready to run).
-    // Copy tasks list, as notifyStatus may modify it.
-    for (final Task task : ImmutableList.copyOf(tasks.values())) {
-      if (recentlyCompletedTasks.contains(task.getId())) {
-        // Don't do anything with tasks that have recently finished; notifyStatus will handle it.
-        continue;
+    for (final String taskId : Lists.newArrayList(activeTaskIdQueue)) {
+      final Task task = tasks.get(taskId);
+      if (task == null || recentlyCompletedTaskIds.contains(taskId)) {
+        // Don't do anything for unknown tasks or recently completed tasks
+      } else if (submittedTaskIds.contains(taskId)) {
+        // Re-trigger execution of pending task to avoid unnecessary delays
+        // see https://github.com/apache/druid/pull/6991
+        if (isTaskPending(task)) {
+          taskRunner.run(task);
+        }
+      } else if (runnerTaskFutures.containsKey(taskId)) {
+        attachCallbacks(task, runnerTaskFutures.get(taskId));
+        submittedTaskIds.add(taskId);
+      } else if (isTaskReady(task)) {
+        log.info("Asking taskRunner to run ready task [%s].", taskId);
+        attachCallbacks(task, taskRunner.run(task));
+        submittedTaskIds.add(taskId);
+      } else {
+        // Release all locks (possibly acquired by task.isReady()) if task is not ready
+        taskLockbox.unlockAll(task);
       }
+    }
+  }
 
-      knownTaskIds.add(task.getId());
-
-      if (!taskFutures.containsKey(task.getId())) {
-        final ListenableFuture<TaskStatus> runnerTaskFuture;
-        if (runnerTaskFutures.containsKey(task.getId())) {
-          runnerTaskFuture = runnerTaskFutures.get(task.getId());
-        } else {
-          // Task should be running, so run it.
-          final boolean taskIsReady;
-          try {
-            taskIsReady = task.isReady(taskActionClientFactory.create(task));
-          }
-          catch (Exception e) {
-            log.warn(e, "Exception thrown during isReady for task: %s", task.getId());
-            final String errorMessage;
-            if (e instanceof MaxAllowedLocksExceededException) {
-              errorMessage = e.getMessage();
-            } else {
-              errorMessage = "Failed while waiting for the task to be ready to run. "
-                                          + "See overlord logs for more details.";
-            }
-            notifyStatus(task, TaskStatus.failure(task.getId(), errorMessage), errorMessage);
-            continue;
-          }
-          if (taskIsReady) {
-            log.info("Asking taskRunner to run: %s", task.getId());
-            runnerTaskFuture = taskRunner.run(task);
-          } else {
-            // Task.isReady() can internally lock intervals or segments.
-            // We should release them if the task is not ready.
-            taskLockbox.unlockAll(task);
-            continue;
-          }
-        }
-        taskFutures.put(task.getId(), attachCallbacks(task, runnerTaskFuture));
-      } else if (isTaskPending(task)) {
-        // if the taskFutures contain this task and this task is pending, also let the taskRunner
-        // to run it to guarantee it will be assigned to run
-        // see https://github.com/apache/druid/pull/6991
-        taskRunner.run(task);
+  private boolean isTaskReady(Task task)
+  {
+    try {
+      return task.isReady(taskActionClientFactory.create(task));
+    }
+    catch (Exception e) {
+      log.warn(e, "Error while checking if task [%s] is ready to run.", task.getId());
+      final String errorMessage;
+      if (e instanceof MaxAllowedLocksExceededException) {
+        errorMessage = e.getMessage();
+      } else {
+        errorMessage = "Failed while waiting for the task to be ready to run. "
+                       + "See overlord logs for more details.";
       }
+      notifyStatus(task, TaskStatus.failure(task.getId(), errorMessage), errorMessage);
+      return false;
     }
   }
 
-  @VisibleForTesting
-  private void manageInternalPostCritical(
-      final Set<String> knownTaskIds,
-      final Map<String, ListenableFuture<TaskStatus>> runnerTaskFutures
-  )
+  /**
+   * Kills tasks not present in the set of known tasks.
+   */
+  private void killUnknownTasks()
   {
-    // Kill tasks that shouldn't be running
-    final Set<String> tasksToKill = Sets.difference(runnerTaskFutures.keySet(), knownTaskIds);
-    if (!tasksToKill.isEmpty()) {
-      log.info("Asking taskRunner to clean up %,d tasks.", tasksToKill.size());
-
-      // On large installations running several thousands of tasks,
-      // concatenating the list of known task ids can be compupationally expensive.
-      final boolean logKnownTaskIds = log.isDebugEnabled();
-      final String reason = logKnownTaskIds
-              ? StringUtils.format("Task is not in knownTaskIds[%s]", knownTaskIds)
-              : "Task is not in knownTaskIds";
-
-      for (final String taskId : tasksToKill) {
-        try {
-          taskRunner.shutdown(taskId, reason);
-        }
-        catch (Exception e) {
-          log.warn(e, "TaskRunner failed to clean up task: %s", taskId);
-        }
+    final Set<String> knownTaskIds = tasks.keySet();

Review Comment:
   I have tried to ensure that all the changes made in the patch #12901 are retained.
   
   > This patch does the following:
   > 1. Fixes the race by adding a recentlyCompletedTasks set that prevents
       the main management loop from doing anything with tasks that are
       currently being cleaned up.
       
   The set of `recentlyCompletedTaskIds` is still being used for this purpose.
       
   > 2. Switches the order of the critical sections in notifyStatus, so
       metadata store updates happen first. This is useful in case of
       server failures: it ensures that if the Overlord fails in the midst
       of notifyStatus, then completed-task statuses are still available in
       ZK or on MMs for the next Overlord. (Those are cleaned up by
       taskRunner.shutdown, which formerly ran first.) This isn't related
       to the race described above, but is fixed opportunistically as part
       of the same patch.
   
   Order of calls is still the same i.e. metadata store update happens first
   
   > 3. Changes the tasks list to a map. Many operations require retrieval
       or removal of individual tasks; those are now O(1) instead of O(N)
       in the number of running tasks.
   
   - The `tasks` map is now a `ConcurrentHashMap` which still performs O(1) get and remove.
   - But now we also have an `activeTaskIdQueue` on which we do an O(n) remove. But this is fine because it does not block other operations of the `TaskQueue`. We need a queue(-like) structure here that must be thread-safe, so I ended up using a `LinkedBlockingDeque`. I am not sure of an alternative that would offer better time complexity.
       
   > 4. Changes various log messages to use task ID instead of full task
   payload, to make the logs more readable.
   
   Retained as is.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] github-code-scanning[bot] commented on a diff in pull request #14293: Remove `giant` lock from Overlord TaskQueue

Posted by "github-code-scanning[bot] (via GitHub)" <gi...@apache.org>.
github-code-scanning[bot] commented on code in PR #14293:
URL: https://github.com/apache/druid/pull/14293#discussion_r1199559407


##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java:
##########
@@ -156,314 +158,246 @@
     this.emitter = Preconditions.checkNotNull(emitter, "emitter");
   }
 
-  @VisibleForTesting
-  void setActive(boolean active)
-  {
-    this.active = active;
-  }
-
   /**
    * Starts this task queue. Allows {@link #add(Task)} to accept new tasks.
    */
   @LifecycleStart
-  public void start()
+  public synchronized void start()
   {
-    giant.lock();
-
-    try {
-      Preconditions.checkState(!active, "queue must be stopped");
-      active = true;
-      syncFromStorage();
-      // Mark these tasks as failed as they could not reacuire the lock
-      // Clean up needs to happen after tasks have been synced from storage
-      Set<Task> tasksToFail = taskLockbox.syncFromStorage().getTasksToFail();
-      for (Task task : tasksToFail) {
-        shutdown(task.getId(),
-                 "Shutting down forcefully as task failed to reacquire lock while becoming leader");
+    Preconditions.checkState(!active, "queue must be stopped");
+
+    // Mark queue as active only after first sync is complete
+    syncFromStorage();
+    active = true;
+
+    // Mark these tasks as failed as they could not reacquire locks
+    Set<Task> tasksToFail = taskLockbox.syncFromStorage().getTasksToFail();
+    for (Task task : tasksToFail) {
+      shutdown(
+          task.getId(),
+          "Shutting down forcefully as task failed to reacquire lock while becoming leader"
+      );
+    }
+    requestManagement("Starting TaskQueue");
+    // Remove any unacquired locks from storage (shutdown only clears entries for which a TaskLockPosse was acquired)
+    // This is called after requesting management as locks need to be cleared after notifyStatus is processed
+    for (Task task : tasksToFail) {
+      for (TaskLock lock : taskStorage.getLocks(task.getId())) {
+        taskStorage.removeLock(task.getId(), lock);
       }
-      managerExec.submit(
-          new Runnable()
-          {
-            @Override
-            public void run()
-            {
-              while (true) {
-                try {
-                  manage();
-                  break;
-                }
-                catch (InterruptedException e) {
-                  log.info("Interrupted, exiting!");
-                  break;
-                }
-                catch (Exception e) {
-                  final long restartDelay = config.getRestartDelay().getMillis();
-                  log.makeAlert(e, "Failed to manage").addData("restartDelay", restartDelay).emit();
-                  try {
-                    Thread.sleep(restartDelay);
-                  }
-                  catch (InterruptedException e2) {
-                    log.info("Interrupted, exiting!");
-                    break;
-                  }
-                }
-              }
+    }
+    log.info("Cleaned up [%d] tasks which failed to reacquire locks.", tasksToFail.size());
+
+    // Submit task management job
+    managerExec.submit(
+        () -> {
+          log.info("Beginning task management in [%s].", config.getStartDelay());
+          long startDelayMillis = config.getStartDelay().getMillis();
+          while (active) {
+            try {
+              Thread.sleep(startDelayMillis);
+              runTaskManagement();
+            }
+            catch (InterruptedException e) {
+              log.info("Interrupted, stopping task management.");
+              break;
+            }
+            catch (Exception e) {
+              startDelayMillis = config.getRestartDelay().getMillis();
+              log.makeAlert(e, "Failed to manage").addData("restartDelay", startDelayMillis).emit();
             }
           }
-      );
-      ScheduledExecutors.scheduleAtFixedRate(
-          storageSyncExec,
-          config.getStorageSyncRate(),
-          new Callable<ScheduledExecutors.Signal>()
-          {
-            @Override
-            public ScheduledExecutors.Signal call()
-            {
-              try {
-                syncFromStorage();
-              }
-              catch (Exception e) {
-                if (active) {
-                  log.makeAlert(e, "Failed to sync with storage").emit();
-                }
-              }
-              if (active) {
-                return ScheduledExecutors.Signal.REPEAT;
-              } else {
-                return ScheduledExecutors.Signal.STOP;
-              }
+        }
+    );
+
+    // Schedule storage sync job
+    ScheduledExecutors.scheduleAtFixedRate(
+        storageSyncExec,
+        config.getStorageSyncRate(),
+        () -> {
+          if (!active) {
+            log.info("Stopping storage sync as TaskQueue has been stopped");
+            return ScheduledExecutors.Signal.STOP;
+          }
+
+          try {
+            syncFromStorage();
+          }
+          catch (Exception e) {
+            if (active) {
+              log.makeAlert(e, "Failed to sync with storage").emit();
             }
           }
-      );
-      requestManagement();
-      // Remove any unacquired locks from storage (shutdown only clears entries for which a TaskLockPosse was acquired)
-      // This is called after requesting management as locks need to be cleared after notifyStatus is processed
-      for (Task task : tasksToFail) {
-        for (TaskLock lock : taskStorage.getLocks(task.getId())) {
-          taskStorage.removeLock(task.getId(), lock);
+
+          return active ? ScheduledExecutors.Signal.REPEAT : ScheduledExecutors.Signal.STOP;
         }
-      }
-    }
-    finally {
-      giant.unlock();
-    }
+    );
   }
 
   /**
    * Shuts down the queue.
    */
   @LifecycleStop
-  public void stop()
-  {
-    giant.lock();
-
-    try {
-      tasks.clear();
-      taskFutures.clear();
-      active = false;
-      managerExec.shutdownNow();
-      storageSyncExec.shutdownNow();
-      requestManagement();
-    }
-    finally {
-      giant.unlock();
-    }
-  }
-
-  public boolean isActive()
+  public synchronized void stop()
   {
-    return active;
+    active = false;
+    tasks.clear();
+    submittedTaskIds.clear();
+    recentlyCompletedTaskIds.clear();
+    managerExec.shutdownNow();
+    storageSyncExec.shutdownNow();
+    requestManagement("Stopping TaskQueue");
   }
 
   /**
    * Request management from the management thread. Non-blocking.
-   *
-   * Other callers (such as notifyStatus) should trigger activity on the
-   * TaskQueue thread by requesting management here.
+   * <p>
+   * Callers (such as notifyStatus) can trigger task management by calling
+   * this method.
    */
-  void requestManagement()
+  private void requestManagement(String reason)
   {
-    // use a BlockingQueue since the offer/poll/wait behaviour is simple
-    // and very easy to reason about
-
-    // the request has to be offer (non blocking), since someone might request
-    // while already holding giant lock
-
     // do not care if the item fits into the queue:
     // if the queue is already full, request has been triggered anyway
-    managementMayBeNecessary.offer(this);
+    managementRequestQueue.offer(reason);

Review Comment:
   ## Ignored error status of call
   
   Method requestManagement ignores exceptional return value of BlockingQueue<String>.offer.
   
   [Show more details](https://github.com/apache/druid/security/code-scanning/4974)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] github-code-scanning[bot] commented on a diff in pull request #14293: Remove `giant` lock from Overlord TaskQueue

Posted by "github-code-scanning[bot] (via GitHub)" <gi...@apache.org>.
github-code-scanning[bot] commented on code in PR #14293:
URL: https://github.com/apache/druid/pull/14293#discussion_r1196730351


##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java:
##########
@@ -156,314 +158,245 @@
     this.emitter = Preconditions.checkNotNull(emitter, "emitter");
   }
 
-  @VisibleForTesting
-  void setActive(boolean active)
-  {
-    this.active = active;
-  }
-
   /**
    * Starts this task queue. Allows {@link #add(Task)} to accept new tasks.
    */
   @LifecycleStart
-  public void start()
+  public synchronized void start()
   {
-    giant.lock();
-
-    try {
-      Preconditions.checkState(!active, "queue must be stopped");
-      active = true;
-      syncFromStorage();
-      // Mark these tasks as failed as they could not reacuire the lock
-      // Clean up needs to happen after tasks have been synced from storage
-      Set<Task> tasksToFail = taskLockbox.syncFromStorage().getTasksToFail();
-      for (Task task : tasksToFail) {
-        shutdown(task.getId(),
-                 "Shutting down forcefully as task failed to reacquire lock while becoming leader");
+    Preconditions.checkState(!active, "queue must be stopped");
+
+    // Mark queue as active only after first sync is complete
+    syncFromStorage();
+    active = true;
+
+    // Mark these tasks as failed as they could not reacquire locks
+    Set<Task> tasksToFail = taskLockbox.syncFromStorage().getTasksToFail();
+    for (Task task : tasksToFail) {
+      shutdown(
+          task.getId(),
+          "Shutting down forcefully as task failed to reacquire lock while becoming leader"
+      );
+    }
+    requestManagement();
+    // Remove any unacquired locks from storage (shutdown only clears entries for which a TaskLockPosse was acquired)
+    // This is called after requesting management as locks need to be cleared after notifyStatus is processed
+    for (Task task : tasksToFail) {
+      for (TaskLock lock : taskStorage.getLocks(task.getId())) {
+        taskStorage.removeLock(task.getId(), lock);
       }
-      managerExec.submit(
-          new Runnable()
-          {
-            @Override
-            public void run()
-            {
-              while (true) {
-                try {
-                  manage();
-                  break;
-                }
-                catch (InterruptedException e) {
-                  log.info("Interrupted, exiting!");
-                  break;
-                }
-                catch (Exception e) {
-                  final long restartDelay = config.getRestartDelay().getMillis();
-                  log.makeAlert(e, "Failed to manage").addData("restartDelay", restartDelay).emit();
-                  try {
-                    Thread.sleep(restartDelay);
-                  }
-                  catch (InterruptedException e2) {
-                    log.info("Interrupted, exiting!");
-                    break;
-                  }
-                }
-              }
+    }
+    log.info("Cleaned up [%d] tasks which failed to reacquire locks.", tasksToFail.size());
+
+    // Submit task management job
+    managerExec.submit(
+        () -> {
+          log.info("Beginning task management in [%s].", config.getStartDelay());
+          long startDelayMillis = config.getStartDelay().getMillis();
+          while (active) {
+            try {
+              Thread.sleep(startDelayMillis);
+              runTaskManagement();
+            }
+            catch (InterruptedException e) {
+              log.info("Interrupted, stopping task management.");
+              break;
+            }
+            catch (Exception e) {
+              startDelayMillis = config.getRestartDelay().getMillis();
+              log.makeAlert(e, "Failed to manage").addData("restartDelay", startDelayMillis).emit();
             }
           }
-      );
-      ScheduledExecutors.scheduleAtFixedRate(
-          storageSyncExec,
-          config.getStorageSyncRate(),
-          new Callable<ScheduledExecutors.Signal>()
-          {
-            @Override
-            public ScheduledExecutors.Signal call()
-            {
-              try {
-                syncFromStorage();
-              }
-              catch (Exception e) {
-                if (active) {
-                  log.makeAlert(e, "Failed to sync with storage").emit();
-                }
-              }
-              if (active) {
-                return ScheduledExecutors.Signal.REPEAT;
-              } else {
-                return ScheduledExecutors.Signal.STOP;
-              }
+        }
+    );
+
+    // Schedule storage sync job
+    ScheduledExecutors.scheduleAtFixedRate(
+        storageSyncExec,
+        config.getStorageSyncRate(),
+        () -> {
+          if (!active) {
+            log.info("Stopping storage sync as TaskQueue has been stopped");
+            return ScheduledExecutors.Signal.STOP;
+          }
+
+          try {
+            syncFromStorage();
+          }
+          catch (Exception e) {
+            if (active) {
+              log.makeAlert(e, "Failed to sync with storage").emit();
             }
           }
-      );
-      requestManagement();
-      // Remove any unacquired locks from storage (shutdown only clears entries for which a TaskLockPosse was acquired)
-      // This is called after requesting management as locks need to be cleared after notifyStatus is processed
-      for (Task task : tasksToFail) {
-        for (TaskLock lock : taskStorage.getLocks(task.getId())) {
-          taskStorage.removeLock(task.getId(), lock);
+
+          return active ? ScheduledExecutors.Signal.REPEAT : ScheduledExecutors.Signal.STOP;
         }
-      }
-    }
-    finally {
-      giant.unlock();
-    }
+    );
   }
 
   /**
    * Shuts down the queue.
    */
   @LifecycleStop
-  public void stop()
-  {
-    giant.lock();
-
-    try {
-      tasks.clear();
-      taskFutures.clear();
-      active = false;
-      managerExec.shutdownNow();
-      storageSyncExec.shutdownNow();
-      requestManagement();
-    }
-    finally {
-      giant.unlock();
-    }
-  }
-
-  public boolean isActive()
+  public synchronized void stop()
   {
-    return active;
+    active = false;
+    tasks.clear();
+    submittedTaskIds.clear();
+    recentlyCompletedTaskIds.clear();
+    managerExec.shutdownNow();
+    storageSyncExec.shutdownNow();
+    requestManagement();
   }
 
   /**
    * Request management from the management thread. Non-blocking.
-   *
-   * Other callers (such as notifyStatus) should trigger activity on the
-   * TaskQueue thread by requesting management here.
+   * <p>
+   * Callers (such as notifyStatus) can trigger task management by calling
+   * this method.
    */
-  void requestManagement()
+  private void requestManagement()
   {
-    // use a BlockingQueue since the offer/poll/wait behaviour is simple
-    // and very easy to reason about
-
-    // the request has to be offer (non blocking), since someone might request
-    // while already holding giant lock
-
     // do not care if the item fits into the queue:
     // if the queue is already full, request has been triggered anyway
-    managementMayBeNecessary.offer(this);
+    managementRequestQueue.offer(this);

Review Comment:
   ## Ignored error status of call
   
   Method requestManagement ignores exceptional return value of BlockingQueue<Object>.offer.
   
   [Show more details](https://github.com/apache/druid/security/code-scanning/4972)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] github-code-scanning[bot] commented on a diff in pull request #14293: Remove `giant` lock from Overlord TaskQueue

Posted by "github-code-scanning[bot] (via GitHub)" <gi...@apache.org>.
github-code-scanning[bot] commented on code in PR #14293:
URL: https://github.com/apache/druid/pull/14293#discussion_r1195494819


##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java:
##########
@@ -749,131 +651,69 @@
           }
         }
     );
-    return statusFuture;
   }
 
   /**
-   * Resync the contents of this task queue with our storage facility. Useful to make sure our in-memory state
-   * corresponds to the storage facility even if the latter is manually modified.
+   * Resync the contents of this task queue with our storage facility.
+   * Useful to make sure our in-memory state corresponds to the storage facility
+   * even if the latter is manually modified.
+   * <p>
+   * This method must be called only when queue is {@link #active}, except when
+   * starting up.
    */
-  private void syncFromStorage()
+  private synchronized void syncFromStorage()
   {
-    giant.lock();
-
-    try {
-      if (active) {
-        final Map<String, Task> newTasks = toTaskIDMap(taskStorage.getActiveTasks());
-        final int tasksSynced = newTasks.size();
-        final Map<String, Task> oldTasks = new HashMap<>(tasks);
-
-        // Calculate differences on IDs instead of Task Objects.
-        Set<String> commonIds = Sets.newHashSet(Sets.intersection(newTasks.keySet(), oldTasks.keySet()));
-        for (String taskID : commonIds) {
-          newTasks.remove(taskID);
-          oldTasks.remove(taskID);
-        }
-        Collection<Task> addedTasks = newTasks.values();
-        Collection<Task> removedTasks = oldTasks.values();
-
-        // Clean up removed Tasks
-        for (Task task : removedTasks) {
-          removeTaskInternal(task.getId());
-        }
-
-        // Add newly Added tasks to the queue
-        for (Task task : addedTasks) {
-          addTaskInternal(task);
-        }
-
-        log.info(
-            "Synced %d tasks from storage (%d tasks added, %d tasks removed).",
-            tasksSynced,
-            addedTasks.size(),
-            removedTasks.size()
-        );
-        requestManagement();
-      } else {
-        log.info("Not active. Skipping storage sync.");
-      }
-    }
-    catch (Exception e) {
-      log.warn(e, "Failed to sync tasks from storage!");
-      throw new RuntimeException(e);
-    }
-    finally {
-      giant.unlock();
-    }
-  }
-
-  private static Map<String, Task> toTaskIDMap(List<Task> taskList)
-  {
-    Map<String, Task> rv = new HashMap<>();
-    for (Task task : taskList) {
-      rv.put(task.getId(), task);
-    }
-    return rv;
-  }
-
-  private Map<String, Long> getDeltaValues(Map<String, Long> total, Map<String, Long> prev)
-  {
-    return total.entrySet()
-                .stream()
-                .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue() - prev.getOrDefault(e.getKey(), 0L)));
-  }
+    final Map<String, Task> newTasks = taskStorage.getActiveTasks().stream().collect(
+        Collectors.toMap(Task::getId, Function.identity())
+    );
+    final Map<String, Task> oldTasks = new HashMap<>(tasks);
+
+    // Calculate differences on IDs instead of Task Objects.
+    Set<String> commonIds = Sets.intersection(newTasks.keySet(), oldTasks.keySet());
+    for (String taskId : commonIds) {
+      newTasks.remove(taskId);
+      oldTasks.remove(taskId);
+    }
+    Collection<Task> addedTasks = newTasks.values();
+    Collection<Task> removedTasks = oldTasks.values();
+
+    // Add new tasks and clean up removed tasks
+    addedTasks.forEach(this::addTaskInternal);
+    removedTasks.forEach(this::removeTaskInternal);
+    log.info(
+        "Synced [%d] tasks from storage. Added [%d] tasks, removed [%d] tasks.",
+        newTasks.size(), addedTasks.size(), removedTasks.size()
+    );
 
-  public Map<String, Long> getSuccessfulTaskCount()
-  {
-    Map<String, Long> total = CollectionUtils.mapValues(totalSuccessfulTaskCount, AtomicLong::get);
-    synchronized (totalSuccessfulTaskCount) {
-      Map<String, Long> delta = getDeltaValues(total, prevTotalSuccessfulTaskCount);
-      prevTotalSuccessfulTaskCount = total;
-      return delta;
-    }
+    requestManagement();
   }
 
-  public Map<String, Long> getFailedTaskCount()
+  public Map<String, Long> getAndResetSuccessfulTaskCounts()
   {
-    Map<String, Long> total = CollectionUtils.mapValues(totalFailedTaskCount, AtomicLong::get);
-    synchronized (totalFailedTaskCount) {
-      Map<String, Long> delta = getDeltaValues(total, prevTotalFailedTaskCount);
-      prevTotalFailedTaskCount = total;
-      return delta;
-    }
+    Map<String, Long> total = new HashMap<>(datasourceToSuccessfulTaskCount);
+    datasourceToSuccessfulTaskCount.clear();
+    return total;
   }
 
-  Map<String, String> getCurrentTaskDatasources()
+  public Map<String, Long> getAndResetFailedTaskCounts()
   {
-    giant.lock();
-    try {
-      return tasks.values().stream().collect(Collectors.toMap(Task::getId, Task::getDataSource));
-    }
-    finally {
-      giant.unlock();
-    }
+    Map<String, Long> total = new HashMap<>(datasourceToFailedTaskCount);
+    datasourceToFailedTaskCount.clear();
+    return total;
   }
 
   public Map<String, Long> getRunningTaskCount()
   {
-    Map<String, String> taskDatasources = getCurrentTaskDatasources();
-    return taskRunner.getRunningTasks()
-                     .stream()
-                     .collect(Collectors.toMap(
-                         e -> taskDatasources.getOrDefault(e.getTaskId(), ""),
-                         e -> 1L,
-                         Long::sum
-                     ));
+    return taskRunner.getRunningTasks().stream().collect(
+        Collectors.toMap(TaskRunnerWorkItem::getDataSource, task -> 1L, Long::sum)

Review Comment:
   ## Useless parameter
   
   The parameter 'task' is never used.
   
   [Show more details](https://github.com/apache/druid/security/code-scanning/4965)



##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java:
##########
@@ -749,131 +651,69 @@
           }
         }
     );
-    return statusFuture;
   }
 
   /**
-   * Resync the contents of this task queue with our storage facility. Useful to make sure our in-memory state
-   * corresponds to the storage facility even if the latter is manually modified.
+   * Resync the contents of this task queue with our storage facility.
+   * Useful to make sure our in-memory state corresponds to the storage facility
+   * even if the latter is manually modified.
+   * <p>
+   * This method must be called only when queue is {@link #active}, except when
+   * starting up.
    */
-  private void syncFromStorage()
+  private synchronized void syncFromStorage()
   {
-    giant.lock();
-
-    try {
-      if (active) {
-        final Map<String, Task> newTasks = toTaskIDMap(taskStorage.getActiveTasks());
-        final int tasksSynced = newTasks.size();
-        final Map<String, Task> oldTasks = new HashMap<>(tasks);
-
-        // Calculate differences on IDs instead of Task Objects.
-        Set<String> commonIds = Sets.newHashSet(Sets.intersection(newTasks.keySet(), oldTasks.keySet()));
-        for (String taskID : commonIds) {
-          newTasks.remove(taskID);
-          oldTasks.remove(taskID);
-        }
-        Collection<Task> addedTasks = newTasks.values();
-        Collection<Task> removedTasks = oldTasks.values();
-
-        // Clean up removed Tasks
-        for (Task task : removedTasks) {
-          removeTaskInternal(task.getId());
-        }
-
-        // Add newly Added tasks to the queue
-        for (Task task : addedTasks) {
-          addTaskInternal(task);
-        }
-
-        log.info(
-            "Synced %d tasks from storage (%d tasks added, %d tasks removed).",
-            tasksSynced,
-            addedTasks.size(),
-            removedTasks.size()
-        );
-        requestManagement();
-      } else {
-        log.info("Not active. Skipping storage sync.");
-      }
-    }
-    catch (Exception e) {
-      log.warn(e, "Failed to sync tasks from storage!");
-      throw new RuntimeException(e);
-    }
-    finally {
-      giant.unlock();
-    }
-  }
-
-  private static Map<String, Task> toTaskIDMap(List<Task> taskList)
-  {
-    Map<String, Task> rv = new HashMap<>();
-    for (Task task : taskList) {
-      rv.put(task.getId(), task);
-    }
-    return rv;
-  }
-
-  private Map<String, Long> getDeltaValues(Map<String, Long> total, Map<String, Long> prev)
-  {
-    return total.entrySet()
-                .stream()
-                .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue() - prev.getOrDefault(e.getKey(), 0L)));
-  }
+    final Map<String, Task> newTasks = taskStorage.getActiveTasks().stream().collect(
+        Collectors.toMap(Task::getId, Function.identity())
+    );
+    final Map<String, Task> oldTasks = new HashMap<>(tasks);
+
+    // Calculate differences on IDs instead of Task Objects.
+    Set<String> commonIds = Sets.intersection(newTasks.keySet(), oldTasks.keySet());
+    for (String taskId : commonIds) {
+      newTasks.remove(taskId);
+      oldTasks.remove(taskId);
+    }
+    Collection<Task> addedTasks = newTasks.values();
+    Collection<Task> removedTasks = oldTasks.values();
+
+    // Add new tasks and clean up removed tasks
+    addedTasks.forEach(this::addTaskInternal);
+    removedTasks.forEach(this::removeTaskInternal);
+    log.info(
+        "Synced [%d] tasks from storage. Added [%d] tasks, removed [%d] tasks.",
+        newTasks.size(), addedTasks.size(), removedTasks.size()
+    );
 
-  public Map<String, Long> getSuccessfulTaskCount()
-  {
-    Map<String, Long> total = CollectionUtils.mapValues(totalSuccessfulTaskCount, AtomicLong::get);
-    synchronized (totalSuccessfulTaskCount) {
-      Map<String, Long> delta = getDeltaValues(total, prevTotalSuccessfulTaskCount);
-      prevTotalSuccessfulTaskCount = total;
-      return delta;
-    }
+    requestManagement();
   }
 
-  public Map<String, Long> getFailedTaskCount()
+  public Map<String, Long> getAndResetSuccessfulTaskCounts()
   {
-    Map<String, Long> total = CollectionUtils.mapValues(totalFailedTaskCount, AtomicLong::get);
-    synchronized (totalFailedTaskCount) {
-      Map<String, Long> delta = getDeltaValues(total, prevTotalFailedTaskCount);
-      prevTotalFailedTaskCount = total;
-      return delta;
-    }
+    Map<String, Long> total = new HashMap<>(datasourceToSuccessfulTaskCount);
+    datasourceToSuccessfulTaskCount.clear();
+    return total;
   }
 
-  Map<String, String> getCurrentTaskDatasources()
+  public Map<String, Long> getAndResetFailedTaskCounts()
   {
-    giant.lock();
-    try {
-      return tasks.values().stream().collect(Collectors.toMap(Task::getId, Task::getDataSource));
-    }
-    finally {
-      giant.unlock();
-    }
+    Map<String, Long> total = new HashMap<>(datasourceToFailedTaskCount);
+    datasourceToFailedTaskCount.clear();
+    return total;
   }
 
   public Map<String, Long> getRunningTaskCount()
   {
-    Map<String, String> taskDatasources = getCurrentTaskDatasources();
-    return taskRunner.getRunningTasks()
-                     .stream()
-                     .collect(Collectors.toMap(
-                         e -> taskDatasources.getOrDefault(e.getTaskId(), ""),
-                         e -> 1L,
-                         Long::sum
-                     ));
+    return taskRunner.getRunningTasks().stream().collect(
+        Collectors.toMap(TaskRunnerWorkItem::getDataSource, task -> 1L, Long::sum)
+    );
   }
 
   public Map<String, Long> getPendingTaskCount()
   {
-    Map<String, String> taskDatasources = getCurrentTaskDatasources();
-    return taskRunner.getPendingTasks()
-                     .stream()
-                     .collect(Collectors.toMap(
-                         e -> taskDatasources.getOrDefault(e.getTaskId(), ""),
-                         e -> 1L,
-                         Long::sum
-                     ));
+    return taskRunner.getPendingTasks().stream().collect(
+        Collectors.toMap(TaskRunnerWorkItem::getDataSource, task -> 1L, Long::sum)

Review Comment:
   ## Useless parameter
   
   The parameter 'task' is never used.
   
   [Show more details](https://github.com/apache/druid/security/code-scanning/4966)



##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java:
##########
@@ -156,314 +153,248 @@
     this.emitter = Preconditions.checkNotNull(emitter, "emitter");
   }
 
-  @VisibleForTesting
-  void setActive(boolean active)
-  {
-    this.active = active;
-  }
-
   /**
    * Starts this task queue. Allows {@link #add(Task)} to accept new tasks.
    */
   @LifecycleStart
-  public void start()
+  public synchronized void start()
   {
-    giant.lock();
-
-    try {
-      Preconditions.checkState(!active, "queue must be stopped");
-      active = true;
-      syncFromStorage();
-      // Mark these tasks as failed as they could not reacuire the lock
-      // Clean up needs to happen after tasks have been synced from storage
-      Set<Task> tasksToFail = taskLockbox.syncFromStorage().getTasksToFail();
-      for (Task task : tasksToFail) {
-        shutdown(task.getId(),
-                 "Shutting down forcefully as task failed to reacquire lock while becoming leader");
+    Preconditions.checkState(!active, "queue must be stopped");
+
+    // Mark queue as active only after first sync is complete
+    syncFromStorage();
+    active = true;
+
+    // Mark these tasks as failed as they could not reacquire locks
+    Set<Task> tasksToFail = taskLockbox.syncFromStorage().getTasksToFail();
+    for (Task task : tasksToFail) {
+      shutdown(
+          task.getId(),
+          "Shutting down forcefully as task failed to reacquire lock while becoming leader"
+      );
+    }
+    requestManagement();
+    // Remove any unacquired locks from storage (shutdown only clears entries for which a TaskLockPosse was acquired)
+    // This is called after requesting management as locks need to be cleared after notifyStatus is processed
+    for (Task task : tasksToFail) {
+      for (TaskLock lock : taskStorage.getLocks(task.getId())) {
+        taskStorage.removeLock(task.getId(), lock);
       }
-      managerExec.submit(
-          new Runnable()
-          {
-            @Override
-            public void run()
-            {
-              while (true) {
-                try {
-                  manage();
-                  break;
-                }
-                catch (InterruptedException e) {
-                  log.info("Interrupted, exiting!");
-                  break;
-                }
-                catch (Exception e) {
-                  final long restartDelay = config.getRestartDelay().getMillis();
-                  log.makeAlert(e, "Failed to manage").addData("restartDelay", restartDelay).emit();
-                  try {
-                    Thread.sleep(restartDelay);
-                  }
-                  catch (InterruptedException e2) {
-                    log.info("Interrupted, exiting!");
-                    break;
-                  }
-                }
-              }
+    }
+    log.info("Cleaned up [%d] tasks which failed to reacquire locks.", tasksToFail.size());
+
+    // Submit task management job
+    managerExec.submit(
+        () -> {
+          log.info("Beginning task management in [%s].", config.getStartDelay());
+          long startDelayMillis = config.getStartDelay().getMillis();
+          while (active) {
+            try {
+              Thread.sleep(startDelayMillis);
+              runTaskManagement();
+            }
+            catch (InterruptedException e) {
+              log.info("Interrupted, stopping task management.");
+              break;
+            }
+            catch (Exception e) {
+              startDelayMillis = config.getRestartDelay().getMillis();
+              log.makeAlert(e, "Failed to manage").addData("restartDelay", startDelayMillis).emit();
             }
           }
-      );
-      ScheduledExecutors.scheduleAtFixedRate(
-          storageSyncExec,
-          config.getStorageSyncRate(),
-          new Callable<ScheduledExecutors.Signal>()
-          {
-            @Override
-            public ScheduledExecutors.Signal call()
-            {
-              try {
-                syncFromStorage();
-              }
-              catch (Exception e) {
-                if (active) {
-                  log.makeAlert(e, "Failed to sync with storage").emit();
-                }
-              }
-              if (active) {
-                return ScheduledExecutors.Signal.REPEAT;
-              } else {
-                return ScheduledExecutors.Signal.STOP;
-              }
+        }
+    );
+
+    // Schedule storage sync job
+    ScheduledExecutors.scheduleAtFixedRate(
+        storageSyncExec,
+        config.getStorageSyncRate(),
+        () -> {
+          if (!active) {
+            log.info("Stopping storage sync as TaskQueue has been stopped");
+            return ScheduledExecutors.Signal.STOP;
+          }
+
+          try {
+            syncFromStorage();
+          }
+          catch (Exception e) {
+            if (active) {
+              log.makeAlert(e, "Failed to sync with storage").emit();
             }
           }
-      );
-      requestManagement();
-      // Remove any unacquired locks from storage (shutdown only clears entries for which a TaskLockPosse was acquired)
-      // This is called after requesting management as locks need to be cleared after notifyStatus is processed
-      for (Task task : tasksToFail) {
-        for (TaskLock lock : taskStorage.getLocks(task.getId())) {
-          taskStorage.removeLock(task.getId(), lock);
+
+          return active ? ScheduledExecutors.Signal.REPEAT : ScheduledExecutors.Signal.STOP;
         }
-      }
-    }
-    finally {
-      giant.unlock();
-    }
+    );
   }
 
   /**
    * Shuts down the queue.
    */
   @LifecycleStop
-  public void stop()
+  public synchronized void stop()
   {
-    giant.lock();
-
-    try {
-      tasks.clear();
-      taskFutures.clear();
-      active = false;
-      managerExec.shutdownNow();
-      storageSyncExec.shutdownNow();
-      requestManagement();
-    }
-    finally {
-      giant.unlock();
-    }
-  }
-
-  public boolean isActive()
-  {
-    return active;
+    active = false;
+    tasks.clear();
+    submittedTaskIds.clear();
+    recentlyCompletedTaskIds.clear();
+    managerExec.shutdownNow();
+    storageSyncExec.shutdownNow();
+    requestManagement();
   }
 
   /**
    * Request management from the management thread. Non-blocking.
-   *
-   * Other callers (such as notifyStatus) should trigger activity on the
-   * TaskQueue thread by requesting management here.
+   * <p>
+   * Callers (such as notifyStatus) can trigger task management by calling
+   * this method.
    */
-  void requestManagement()
+  private void requestManagement()
   {
-    // use a BlockingQueue since the offer/poll/wait behaviour is simple
-    // and very easy to reason about
-
-    // the request has to be offer (non blocking), since someone might request
-    // while already holding giant lock
-
-    // do not care if the item fits into the queue:
-    // if the queue is already full, request has been triggered anyway
-    managementMayBeNecessary.offer(this);
+    synchronized (managementRequested) {
+      managementRequested.set(true);
+      managementRequested.notify();

Review Comment:
   ## notify instead of notifyAll
   
   Using notify rather than notifyAll.
   
   [Show more details](https://github.com/apache/druid/security/code-scanning/4967)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] jasonk000 commented on a diff in pull request #14293: Remove `giant` lock from Overlord TaskQueue

Posted by "jasonk000 (via GitHub)" <gi...@apache.org>.
jasonk000 commented on code in PR #14293:
URL: https://github.com/apache/druid/pull/14293#discussion_r1195893770


##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java:
##########
@@ -749,131 +651,69 @@ private void handleStatus(final TaskStatus status)
           }
         }
     );
-    return statusFuture;
   }
 
   /**
-   * Resync the contents of this task queue with our storage facility. Useful to make sure our in-memory state
-   * corresponds to the storage facility even if the latter is manually modified.
+   * Resync the contents of this task queue with our storage facility.
+   * Useful to make sure our in-memory state corresponds to the storage facility
+   * even if the latter is manually modified.
+   * <p>
+   * This method must be called only when queue is {@link #active}, except when
+   * starting up.
    */
-  private void syncFromStorage()
+  private synchronized void syncFromStorage()
   {
-    giant.lock();
-
-    try {
-      if (active) {
-        final Map<String, Task> newTasks = toTaskIDMap(taskStorage.getActiveTasks());
-        final int tasksSynced = newTasks.size();
-        final Map<String, Task> oldTasks = new HashMap<>(tasks);
-
-        // Calculate differences on IDs instead of Task Objects.
-        Set<String> commonIds = Sets.newHashSet(Sets.intersection(newTasks.keySet(), oldTasks.keySet()));
-        for (String taskID : commonIds) {
-          newTasks.remove(taskID);
-          oldTasks.remove(taskID);
-        }
-        Collection<Task> addedTasks = newTasks.values();
-        Collection<Task> removedTasks = oldTasks.values();
-
-        // Clean up removed Tasks
-        for (Task task : removedTasks) {
-          removeTaskInternal(task.getId());
-        }
-
-        // Add newly Added tasks to the queue
-        for (Task task : addedTasks) {
-          addTaskInternal(task);
-        }
-
-        log.info(
-            "Synced %d tasks from storage (%d tasks added, %d tasks removed).",
-            tasksSynced,
-            addedTasks.size(),
-            removedTasks.size()
-        );
-        requestManagement();
-      } else {
-        log.info("Not active. Skipping storage sync.");
-      }
-    }
-    catch (Exception e) {
-      log.warn(e, "Failed to sync tasks from storage!");
-      throw new RuntimeException(e);
-    }
-    finally {
-      giant.unlock();
-    }
-  }
-
-  private static Map<String, Task> toTaskIDMap(List<Task> taskList)
-  {
-    Map<String, Task> rv = new HashMap<>();
-    for (Task task : taskList) {
-      rv.put(task.getId(), task);
-    }
-    return rv;
-  }
-
-  private Map<String, Long> getDeltaValues(Map<String, Long> total, Map<String, Long> prev)
-  {
-    return total.entrySet()
-                .stream()
-                .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue() - prev.getOrDefault(e.getKey(), 0L)));
-  }
+    final Map<String, Task> newTasks = taskStorage.getActiveTasks().stream().collect(
+        Collectors.toMap(Task::getId, Function.identity())
+    );
+    final Map<String, Task> oldTasks = new HashMap<>(tasks);
+
+    // Calculate differences on IDs instead of Task Objects.
+    Set<String> commonIds = Sets.intersection(newTasks.keySet(), oldTasks.keySet());
+    for (String taskId : commonIds) {
+      newTasks.remove(taskId);
+      oldTasks.remove(taskId);
+    }
+    Collection<Task> addedTasks = newTasks.values();
+    Collection<Task> removedTasks = oldTasks.values();
+
+    // Add new tasks and clean up removed tasks
+    addedTasks.forEach(this::addTaskInternal);
+    removedTasks.forEach(this::removeTaskInternal);
+    log.info(

Review Comment:
   Could use `synchronized (this) { .... }` ?
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] kfaraz commented on a diff in pull request #14293: Remove `giant` lock from Overlord TaskQueue

Posted by "kfaraz (via GitHub)" <gi...@apache.org>.
kfaraz commented on code in PR #14293:
URL: https://github.com/apache/druid/pull/14293#discussion_r1195890450


##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java:
##########
@@ -156,314 +153,248 @@ public TaskQueue(
     this.emitter = Preconditions.checkNotNull(emitter, "emitter");
   }
 
-  @VisibleForTesting
-  void setActive(boolean active)
-  {
-    this.active = active;
-  }
-
   /**
    * Starts this task queue. Allows {@link #add(Task)} to accept new tasks.
    */
   @LifecycleStart
-  public void start()
+  public synchronized void start()
   {
-    giant.lock();
-
-    try {
-      Preconditions.checkState(!active, "queue must be stopped");
-      active = true;
-      syncFromStorage();
-      // Mark these tasks as failed as they could not reacuire the lock
-      // Clean up needs to happen after tasks have been synced from storage
-      Set<Task> tasksToFail = taskLockbox.syncFromStorage().getTasksToFail();
-      for (Task task : tasksToFail) {
-        shutdown(task.getId(),
-                 "Shutting down forcefully as task failed to reacquire lock while becoming leader");
+    Preconditions.checkState(!active, "queue must be stopped");
+
+    // Mark queue as active only after first sync is complete
+    syncFromStorage();
+    active = true;
+
+    // Mark these tasks as failed as they could not reacquire locks
+    Set<Task> tasksToFail = taskLockbox.syncFromStorage().getTasksToFail();
+    for (Task task : tasksToFail) {
+      shutdown(
+          task.getId(),
+          "Shutting down forcefully as task failed to reacquire lock while becoming leader"
+      );
+    }
+    requestManagement();
+    // Remove any unacquired locks from storage (shutdown only clears entries for which a TaskLockPosse was acquired)
+    // This is called after requesting management as locks need to be cleared after notifyStatus is processed
+    for (Task task : tasksToFail) {
+      for (TaskLock lock : taskStorage.getLocks(task.getId())) {
+        taskStorage.removeLock(task.getId(), lock);
       }
-      managerExec.submit(
-          new Runnable()
-          {
-            @Override
-            public void run()
-            {
-              while (true) {
-                try {
-                  manage();
-                  break;
-                }
-                catch (InterruptedException e) {
-                  log.info("Interrupted, exiting!");
-                  break;
-                }
-                catch (Exception e) {
-                  final long restartDelay = config.getRestartDelay().getMillis();
-                  log.makeAlert(e, "Failed to manage").addData("restartDelay", restartDelay).emit();
-                  try {
-                    Thread.sleep(restartDelay);
-                  }
-                  catch (InterruptedException e2) {
-                    log.info("Interrupted, exiting!");
-                    break;
-                  }
-                }
-              }
+    }
+    log.info("Cleaned up [%d] tasks which failed to reacquire locks.", tasksToFail.size());
+
+    // Submit task management job
+    managerExec.submit(
+        () -> {
+          log.info("Beginning task management in [%s].", config.getStartDelay());
+          long startDelayMillis = config.getStartDelay().getMillis();
+          while (active) {
+            try {
+              Thread.sleep(startDelayMillis);
+              runTaskManagement();
+            }
+            catch (InterruptedException e) {
+              log.info("Interrupted, stopping task management.");
+              break;
+            }
+            catch (Exception e) {
+              startDelayMillis = config.getRestartDelay().getMillis();
+              log.makeAlert(e, "Failed to manage").addData("restartDelay", startDelayMillis).emit();
             }
           }
-      );
-      ScheduledExecutors.scheduleAtFixedRate(
-          storageSyncExec,
-          config.getStorageSyncRate(),
-          new Callable<ScheduledExecutors.Signal>()
-          {
-            @Override
-            public ScheduledExecutors.Signal call()
-            {
-              try {
-                syncFromStorage();
-              }
-              catch (Exception e) {
-                if (active) {
-                  log.makeAlert(e, "Failed to sync with storage").emit();
-                }
-              }
-              if (active) {
-                return ScheduledExecutors.Signal.REPEAT;
-              } else {
-                return ScheduledExecutors.Signal.STOP;
-              }
+        }
+    );
+
+    // Schedule storage sync job
+    ScheduledExecutors.scheduleAtFixedRate(
+        storageSyncExec,
+        config.getStorageSyncRate(),
+        () -> {
+          if (!active) {
+            log.info("Stopping storage sync as TaskQueue has been stopped");
+            return ScheduledExecutors.Signal.STOP;
+          }
+
+          try {
+            syncFromStorage();
+          }
+          catch (Exception e) {
+            if (active) {
+              log.makeAlert(e, "Failed to sync with storage").emit();
             }
           }
-      );
-      requestManagement();
-      // Remove any unacquired locks from storage (shutdown only clears entries for which a TaskLockPosse was acquired)
-      // This is called after requesting management as locks need to be cleared after notifyStatus is processed
-      for (Task task : tasksToFail) {
-        for (TaskLock lock : taskStorage.getLocks(task.getId())) {
-          taskStorage.removeLock(task.getId(), lock);
+
+          return active ? ScheduledExecutors.Signal.REPEAT : ScheduledExecutors.Signal.STOP;
         }
-      }
-    }
-    finally {
-      giant.unlock();
-    }
+    );
   }
 
   /**
    * Shuts down the queue.
    */
   @LifecycleStop
-  public void stop()
+  public synchronized void stop()
   {
-    giant.lock();
-
-    try {
-      tasks.clear();
-      taskFutures.clear();
-      active = false;
-      managerExec.shutdownNow();
-      storageSyncExec.shutdownNow();
-      requestManagement();
-    }
-    finally {
-      giant.unlock();
-    }
-  }
-
-  public boolean isActive()
-  {
-    return active;
+    active = false;
+    tasks.clear();
+    submittedTaskIds.clear();
+    recentlyCompletedTaskIds.clear();
+    managerExec.shutdownNow();
+    storageSyncExec.shutdownNow();
+    requestManagement();
   }
 
   /**
    * Request management from the management thread. Non-blocking.

Review Comment:
   Hmm, let me reevaluate this. Maybe we can just stick with the `BlockingDeque` that you had earlier.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


Re: [PR] Remove `giant` lock from Overlord TaskQueue (druid)

Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] commented on PR #14293:
URL: https://github.com/apache/druid/pull/14293#issuecomment-1989680282

   This pull request/issue has been closed due to lack of activity. If you think that
   is incorrect, or the pull request requires review, you can revive the PR at any time.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


Re: [PR] Remove `giant` lock from Overlord TaskQueue (druid)

Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] closed pull request #14293: Remove `giant` lock from Overlord TaskQueue
URL: https://github.com/apache/druid/pull/14293


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] kfaraz commented on a diff in pull request #14293: Remove `giant` lock from Overlord TaskQueue

Posted by "kfaraz (via GitHub)" <gi...@apache.org>.
kfaraz commented on code in PR #14293:
URL: https://github.com/apache/druid/pull/14293#discussion_r1195888479


##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java:
##########
@@ -156,314 +153,248 @@ public TaskQueue(
     this.emitter = Preconditions.checkNotNull(emitter, "emitter");
   }
 
-  @VisibleForTesting
-  void setActive(boolean active)
-  {
-    this.active = active;
-  }
-
   /**
    * Starts this task queue. Allows {@link #add(Task)} to accept new tasks.
    */
   @LifecycleStart
-  public void start()
+  public synchronized void start()
   {
-    giant.lock();
-
-    try {
-      Preconditions.checkState(!active, "queue must be stopped");
-      active = true;
-      syncFromStorage();
-      // Mark these tasks as failed as they could not reacuire the lock
-      // Clean up needs to happen after tasks have been synced from storage
-      Set<Task> tasksToFail = taskLockbox.syncFromStorage().getTasksToFail();
-      for (Task task : tasksToFail) {
-        shutdown(task.getId(),
-                 "Shutting down forcefully as task failed to reacquire lock while becoming leader");
+    Preconditions.checkState(!active, "queue must be stopped");
+
+    // Mark queue as active only after first sync is complete
+    syncFromStorage();
+    active = true;
+
+    // Mark these tasks as failed as they could not reacquire locks
+    Set<Task> tasksToFail = taskLockbox.syncFromStorage().getTasksToFail();
+    for (Task task : tasksToFail) {
+      shutdown(
+          task.getId(),
+          "Shutting down forcefully as task failed to reacquire lock while becoming leader"
+      );
+    }
+    requestManagement();
+    // Remove any unacquired locks from storage (shutdown only clears entries for which a TaskLockPosse was acquired)
+    // This is called after requesting management as locks need to be cleared after notifyStatus is processed
+    for (Task task : tasksToFail) {
+      for (TaskLock lock : taskStorage.getLocks(task.getId())) {
+        taskStorage.removeLock(task.getId(), lock);
       }
-      managerExec.submit(
-          new Runnable()
-          {
-            @Override
-            public void run()
-            {
-              while (true) {
-                try {
-                  manage();
-                  break;
-                }
-                catch (InterruptedException e) {
-                  log.info("Interrupted, exiting!");
-                  break;
-                }
-                catch (Exception e) {
-                  final long restartDelay = config.getRestartDelay().getMillis();
-                  log.makeAlert(e, "Failed to manage").addData("restartDelay", restartDelay).emit();
-                  try {
-                    Thread.sleep(restartDelay);
-                  }
-                  catch (InterruptedException e2) {
-                    log.info("Interrupted, exiting!");
-                    break;
-                  }
-                }
-              }
+    }
+    log.info("Cleaned up [%d] tasks which failed to reacquire locks.", tasksToFail.size());
+
+    // Submit task management job
+    managerExec.submit(
+        () -> {
+          log.info("Beginning task management in [%s].", config.getStartDelay());
+          long startDelayMillis = config.getStartDelay().getMillis();
+          while (active) {
+            try {
+              Thread.sleep(startDelayMillis);
+              runTaskManagement();
+            }
+            catch (InterruptedException e) {
+              log.info("Interrupted, stopping task management.");
+              break;
+            }
+            catch (Exception e) {
+              startDelayMillis = config.getRestartDelay().getMillis();
+              log.makeAlert(e, "Failed to manage").addData("restartDelay", startDelayMillis).emit();
             }
           }
-      );
-      ScheduledExecutors.scheduleAtFixedRate(
-          storageSyncExec,
-          config.getStorageSyncRate(),
-          new Callable<ScheduledExecutors.Signal>()
-          {
-            @Override
-            public ScheduledExecutors.Signal call()
-            {
-              try {
-                syncFromStorage();
-              }
-              catch (Exception e) {
-                if (active) {
-                  log.makeAlert(e, "Failed to sync with storage").emit();
-                }
-              }
-              if (active) {
-                return ScheduledExecutors.Signal.REPEAT;
-              } else {
-                return ScheduledExecutors.Signal.STOP;
-              }
+        }
+    );
+
+    // Schedule storage sync job
+    ScheduledExecutors.scheduleAtFixedRate(
+        storageSyncExec,
+        config.getStorageSyncRate(),
+        () -> {
+          if (!active) {
+            log.info("Stopping storage sync as TaskQueue has been stopped");
+            return ScheduledExecutors.Signal.STOP;
+          }
+
+          try {
+            syncFromStorage();
+          }
+          catch (Exception e) {
+            if (active) {
+              log.makeAlert(e, "Failed to sync with storage").emit();
             }
           }
-      );
-      requestManagement();
-      // Remove any unacquired locks from storage (shutdown only clears entries for which a TaskLockPosse was acquired)
-      // This is called after requesting management as locks need to be cleared after notifyStatus is processed
-      for (Task task : tasksToFail) {
-        for (TaskLock lock : taskStorage.getLocks(task.getId())) {
-          taskStorage.removeLock(task.getId(), lock);
+
+          return active ? ScheduledExecutors.Signal.REPEAT : ScheduledExecutors.Signal.STOP;
         }
-      }
-    }
-    finally {
-      giant.unlock();
-    }
+    );
   }
 
   /**
    * Shuts down the queue.
    */
   @LifecycleStop
-  public void stop()
+  public synchronized void stop()
   {
-    giant.lock();
-
-    try {
-      tasks.clear();
-      taskFutures.clear();
-      active = false;
-      managerExec.shutdownNow();
-      storageSyncExec.shutdownNow();
-      requestManagement();
-    }
-    finally {
-      giant.unlock();
-    }
-  }
-
-  public boolean isActive()
-  {
-    return active;
+    active = false;
+    tasks.clear();
+    submittedTaskIds.clear();
+    recentlyCompletedTaskIds.clear();
+    managerExec.shutdownNow();
+    storageSyncExec.shutdownNow();
+    requestManagement();
   }
 
   /**
    * Request management from the management thread. Non-blocking.
-   *
-   * Other callers (such as notifyStatus) should trigger activity on the
-   * TaskQueue thread by requesting management here.
+   * <p>
+   * Callers (such as notifyStatus) can trigger task management by calling
+   * this method.
    */
-  void requestManagement()
+  private void requestManagement()
   {
-    // use a BlockingQueue since the offer/poll/wait behaviour is simple
-    // and very easy to reason about
-
-    // the request has to be offer (non blocking), since someone might request
-    // while already holding giant lock
-
-    // do not care if the item fits into the queue:
-    // if the queue is already full, request has been triggered anyway
-    managementMayBeNecessary.offer(this);
+    synchronized (managementRequested) {
+      managementRequested.set(true);
+      managementRequested.notify();
+    }
   }
 
   /**
-   * Await for an event to manage.
-   *
-   * This should only be called from the management thread to wait for activity.
+   * Waits for a management request to be triggered by another thread.
    *
-   * @param nanos
-   * @throws InterruptedException
+   * @throws InterruptedException if the thread is interrupted while waiting.
    */
-  @SuppressFBWarnings(value = "RV_RETURN_VALUE_IGNORED", justification = "using queue as notification mechanism, result has no value")
-  void awaitManagementNanos(long nanos) throws InterruptedException
+  private void awaitManagementRequest() throws InterruptedException
   {
     // mitigate a busy loop, it can get pretty busy when there are a lot of start/stops
     try {
-      Thread.sleep(MIN_WAIT_TIME_MS);
+      Thread.sleep(MIN_WAIT_TIME_MILLIS);
     }
     catch (InterruptedException e) {
       throw new RuntimeException(e);
     }
 
-    // wait for an item, if an item arrives (or is already available), complete immediately
-    // (does not actually matter what the item is)
-    managementMayBeNecessary.poll(nanos - (TimeUnit.MILLISECONDS.toNanos(MIN_WAIT_TIME_MS)), TimeUnit.NANOSECONDS);
-
-    // there may have been multiple requests, clear them all
-    managementMayBeNecessary.clear();
+    // Wait for management to be requested
+    synchronized (managementRequested) {
+      while (!managementRequested.get()) {
+        managementRequested.wait(MANAGEMENT_WAIT_TIMEOUT_MILLIS - MIN_WAIT_TIME_MILLIS);
+      }
+      managementRequested.compareAndSet(true, false);
+    }
   }
 
   /**
    * Main task runner management loop. Meant to run forever, or, at least until we're stopped.
    */
-  private void manage() throws InterruptedException
+  private void runTaskManagement() throws InterruptedException
   {
-    log.info("Beginning management in %s.", config.getStartDelay());
-    Thread.sleep(config.getStartDelay().getMillis());
-
     // Ignore return value- we'll get the IDs and futures from getKnownTasks later.
     taskRunner.restore();
 
     while (active) {
-      manageInternal();
-
-      // awaitNanos because management may become necessary without this condition signalling,
-      // due to e.g. tasks becoming ready when other folks mess with the TaskLockbox.
-      awaitManagementNanos(MANAGEMENT_WAIT_TIMEOUT_NANOS);
+      manageTasks();
+      awaitManagementRequest();
     }
   }
 
   @VisibleForTesting
-  void manageInternal()
+  void manageTasks()
   {
-    Set<String> knownTaskIds = new HashSet<>();
-    Map<String, ListenableFuture<TaskStatus>> runnerTaskFutures = new HashMap<>();
-
-    giant.lock();
-
-    try {
-      manageInternalCritical(knownTaskIds, runnerTaskFutures);
-    }
-    finally {
-      giant.unlock();
-    }
-
-    manageInternalPostCritical(knownTaskIds, runnerTaskFutures);
+    runReadyTasks();
+    killUnknownTasks();
   }
 
 
   /**
-   * Management loop critical section tasks.
-   *
-   * @param knownTaskIds will be modified - filled with known task IDs
-   * @param runnerTaskFutures will be modified - filled with futures related to getting the running tasks
+   * Submits ready tasks to the TaskRunner.
+   * <p>
+   * This method should be called only by the management thread.
    */
-  @GuardedBy("giant")
-  private void manageInternalCritical(
-      final Set<String> knownTaskIds,
-      final Map<String, ListenableFuture<TaskStatus>> runnerTaskFutures
-  )
+  private synchronized void runReadyTasks()
   {
     // Task futures available from the taskRunner
+    final Map<String, ListenableFuture<TaskStatus>> runnerTaskFutures = new HashMap<>();
     for (final TaskRunnerWorkItem workItem : taskRunner.getKnownTasks()) {
-      if (!recentlyCompletedTasks.contains(workItem.getTaskId())) {
-        // Don't do anything with tasks that have recently finished; notifyStatus will handle it.
+      if (!recentlyCompletedTaskIds.contains(workItem.getTaskId())) {
+        // Don't do anything with recently completed tasks; notifyStatus will handle it.
         runnerTaskFutures.put(workItem.getTaskId(), workItem.getResult());
       }
     }
+
     // Attain futures for all active tasks (assuming they are ready to run).
-    // Copy tasks list, as notifyStatus may modify it.
-    for (final Task task : ImmutableList.copyOf(tasks.values())) {
-      if (recentlyCompletedTasks.contains(task.getId())) {
-        // Don't do anything with tasks that have recently finished; notifyStatus will handle it.
-        continue;
+    for (final String taskId : Lists.newArrayList(activeTaskIdQueue)) {
+      final Task task = tasks.get(taskId);
+      if (task == null || recentlyCompletedTaskIds.contains(taskId)) {
+        // Don't do anything for unknown tasks or recently completed tasks
+      } else if (submittedTaskIds.contains(taskId)) {
+        // Re-trigger execution of pending task to avoid unnecessary delays
+        // see https://github.com/apache/druid/pull/6991
+        if (isTaskPending(task)) {
+          taskRunner.run(task);
+        }
+      } else if (runnerTaskFutures.containsKey(taskId)) {
+        attachCallbacks(task, runnerTaskFutures.get(taskId));
+        submittedTaskIds.add(taskId);
+      } else if (isTaskReady(task)) {
+        log.info("Asking taskRunner to run ready task [%s].", taskId);
+        attachCallbacks(task, taskRunner.run(task));
+        submittedTaskIds.add(taskId);
+      } else {
+        // Release all locks (possibly acquired by task.isReady()) if task is not ready
+        taskLockbox.unlockAll(task);
       }
+    }
+  }
 
-      knownTaskIds.add(task.getId());
-
-      if (!taskFutures.containsKey(task.getId())) {
-        final ListenableFuture<TaskStatus> runnerTaskFuture;
-        if (runnerTaskFutures.containsKey(task.getId())) {
-          runnerTaskFuture = runnerTaskFutures.get(task.getId());
-        } else {
-          // Task should be running, so run it.
-          final boolean taskIsReady;
-          try {
-            taskIsReady = task.isReady(taskActionClientFactory.create(task));
-          }
-          catch (Exception e) {
-            log.warn(e, "Exception thrown during isReady for task: %s", task.getId());
-            final String errorMessage;
-            if (e instanceof MaxAllowedLocksExceededException) {
-              errorMessage = e.getMessage();
-            } else {
-              errorMessage = "Failed while waiting for the task to be ready to run. "
-                                          + "See overlord logs for more details.";
-            }
-            notifyStatus(task, TaskStatus.failure(task.getId(), errorMessage), errorMessage);
-            continue;
-          }
-          if (taskIsReady) {
-            log.info("Asking taskRunner to run: %s", task.getId());
-            runnerTaskFuture = taskRunner.run(task);
-          } else {
-            // Task.isReady() can internally lock intervals or segments.
-            // We should release them if the task is not ready.
-            taskLockbox.unlockAll(task);
-            continue;
-          }
-        }
-        taskFutures.put(task.getId(), attachCallbacks(task, runnerTaskFuture));
-      } else if (isTaskPending(task)) {
-        // if the taskFutures contain this task and this task is pending, also let the taskRunner
-        // to run it to guarantee it will be assigned to run
-        // see https://github.com/apache/druid/pull/6991
-        taskRunner.run(task);
+  private boolean isTaskReady(Task task)
+  {
+    try {
+      return task.isReady(taskActionClientFactory.create(task));
+    }
+    catch (Exception e) {
+      log.warn(e, "Error while checking if task [%s] is ready to run.", task.getId());
+      final String errorMessage;
+      if (e instanceof MaxAllowedLocksExceededException) {
+        errorMessage = e.getMessage();
+      } else {
+        errorMessage = "Failed while waiting for the task to be ready to run. "
+                       + "See overlord logs for more details.";
       }
+      notifyStatus(task, TaskStatus.failure(task.getId(), errorMessage), errorMessage);
+      return false;
     }
   }
 
-  @VisibleForTesting
-  private void manageInternalPostCritical(
-      final Set<String> knownTaskIds,
-      final Map<String, ListenableFuture<TaskStatus>> runnerTaskFutures
-  )
+  /**
+   * Kills tasks not present in the set of known tasks.
+   */
+  private void killUnknownTasks()
   {
-    // Kill tasks that shouldn't be running
-    final Set<String> tasksToKill = Sets.difference(runnerTaskFutures.keySet(), knownTaskIds);
-    if (!tasksToKill.isEmpty()) {
-      log.info("Asking taskRunner to clean up %,d tasks.", tasksToKill.size());
-
-      // On large installations running several thousands of tasks,
-      // concatenating the list of known task ids can be compupationally expensive.
-      final boolean logKnownTaskIds = log.isDebugEnabled();
-      final String reason = logKnownTaskIds
-              ? StringUtils.format("Task is not in knownTaskIds[%s]", knownTaskIds)
-              : "Task is not in knownTaskIds";
-
-      for (final String taskId : tasksToKill) {
-        try {
-          taskRunner.shutdown(taskId, reason);
-        }
-        catch (Exception e) {
-          log.warn(e, "TaskRunner failed to clean up task: %s", taskId);
-        }
+    final Set<String> knownTaskIds = tasks.keySet();

Review Comment:
   The changes in #12099 were a lot of help in cleaning up the `TaskQueue` in this patch. 
   
   The major difference from #12099 is that I am now using a synchronization on an `AtomicBoolean managementRequested` instead of a queue to track if management needs to be done. This choice was made in favour of readability and clarification of intent.
   
   I think you had called out this possible approach here:
   > I chose a BlockingQueue implementation because it is easy to reason about the submission / poll / offer ordering. Other options would be Semaphore etc.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] kfaraz commented on a diff in pull request #14293: Remove `giant` lock from Overlord TaskQueue

Posted by "kfaraz (via GitHub)" <gi...@apache.org>.
kfaraz commented on code in PR #14293:
URL: https://github.com/apache/druid/pull/14293#discussion_r1195899385


##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java:
##########
@@ -749,131 +651,69 @@ private void handleStatus(final TaskStatus status)
           }
         }
     );
-    return statusFuture;
   }
 
   /**
-   * Resync the contents of this task queue with our storage facility. Useful to make sure our in-memory state
-   * corresponds to the storage facility even if the latter is manually modified.
+   * Resync the contents of this task queue with our storage facility.
+   * Useful to make sure our in-memory state corresponds to the storage facility
+   * even if the latter is manually modified.
+   * <p>
+   * This method must be called only when queue is {@link #active}, except when
+   * starting up.
    */
-  private void syncFromStorage()
+  private synchronized void syncFromStorage()
   {
-    giant.lock();
-
-    try {
-      if (active) {
-        final Map<String, Task> newTasks = toTaskIDMap(taskStorage.getActiveTasks());
-        final int tasksSynced = newTasks.size();
-        final Map<String, Task> oldTasks = new HashMap<>(tasks);
-
-        // Calculate differences on IDs instead of Task Objects.
-        Set<String> commonIds = Sets.newHashSet(Sets.intersection(newTasks.keySet(), oldTasks.keySet()));
-        for (String taskID : commonIds) {
-          newTasks.remove(taskID);
-          oldTasks.remove(taskID);
-        }
-        Collection<Task> addedTasks = newTasks.values();
-        Collection<Task> removedTasks = oldTasks.values();
-
-        // Clean up removed Tasks
-        for (Task task : removedTasks) {
-          removeTaskInternal(task.getId());
-        }
-
-        // Add newly Added tasks to the queue
-        for (Task task : addedTasks) {
-          addTaskInternal(task);
-        }
-
-        log.info(
-            "Synced %d tasks from storage (%d tasks added, %d tasks removed).",
-            tasksSynced,
-            addedTasks.size(),
-            removedTasks.size()
-        );
-        requestManagement();
-      } else {
-        log.info("Not active. Skipping storage sync.");
-      }
-    }
-    catch (Exception e) {
-      log.warn(e, "Failed to sync tasks from storage!");
-      throw new RuntimeException(e);
-    }
-    finally {
-      giant.unlock();
-    }
-  }
-
-  private static Map<String, Task> toTaskIDMap(List<Task> taskList)
-  {
-    Map<String, Task> rv = new HashMap<>();
-    for (Task task : taskList) {
-      rv.put(task.getId(), task);
-    }
-    return rv;
-  }
-
-  private Map<String, Long> getDeltaValues(Map<String, Long> total, Map<String, Long> prev)
-  {
-    return total.entrySet()
-                .stream()
-                .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue() - prev.getOrDefault(e.getKey(), 0L)));
-  }
+    final Map<String, Task> newTasks = taskStorage.getActiveTasks().stream().collect(
+        Collectors.toMap(Task::getId, Function.identity())
+    );
+    final Map<String, Task> oldTasks = new HashMap<>(tasks);
+
+    // Calculate differences on IDs instead of Task Objects.
+    Set<String> commonIds = Sets.intersection(newTasks.keySet(), oldTasks.keySet());
+    for (String taskId : commonIds) {
+      newTasks.remove(taskId);
+      oldTasks.remove(taskId);
+    }
+    Collection<Task> addedTasks = newTasks.values();
+    Collection<Task> removedTasks = oldTasks.values();
+
+    // Add new tasks and clean up removed tasks
+    addedTasks.forEach(this::addTaskInternal);
+    removedTasks.forEach(this::removeTaskInternal);
+    log.info(
+        "Synced [%d] tasks from storage. Added [%d] tasks, removed [%d] tasks.",
+        newTasks.size(), addedTasks.size(), removedTasks.size()
+    );
 
-  public Map<String, Long> getSuccessfulTaskCount()
-  {
-    Map<String, Long> total = CollectionUtils.mapValues(totalSuccessfulTaskCount, AtomicLong::get);
-    synchronized (totalSuccessfulTaskCount) {
-      Map<String, Long> delta = getDeltaValues(total, prevTotalSuccessfulTaskCount);
-      prevTotalSuccessfulTaskCount = total;
-      return delta;
-    }
+    requestManagement();
   }
 
-  public Map<String, Long> getFailedTaskCount()
+  public Map<String, Long> getAndResetSuccessfulTaskCounts()
   {
-    Map<String, Long> total = CollectionUtils.mapValues(totalFailedTaskCount, AtomicLong::get);
-    synchronized (totalFailedTaskCount) {
-      Map<String, Long> delta = getDeltaValues(total, prevTotalFailedTaskCount);
-      prevTotalFailedTaskCount = total;
-      return delta;
-    }
+    Map<String, Long> total = new HashMap<>(datasourceToSuccessfulTaskCount);
+    datasourceToSuccessfulTaskCount.clear();

Review Comment:
   Yeah, I didn't like this either. Updated to iterate over keys and atomically remove them one by one.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] jasonk000 commented on a diff in pull request #14293: Remove `giant` lock from Overlord TaskQueue

Posted by "jasonk000 (via GitHub)" <gi...@apache.org>.
jasonk000 commented on code in PR #14293:
URL: https://github.com/apache/druid/pull/14293#discussion_r1195894064


##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java:
##########
@@ -749,131 +651,69 @@ private void handleStatus(final TaskStatus status)
           }
         }
     );
-    return statusFuture;
   }
 
   /**
-   * Resync the contents of this task queue with our storage facility. Useful to make sure our in-memory state
-   * corresponds to the storage facility even if the latter is manually modified.
+   * Resync the contents of this task queue with our storage facility.
+   * Useful to make sure our in-memory state corresponds to the storage facility
+   * even if the latter is manually modified.
+   * <p>
+   * This method must be called only when queue is {@link #active}, except when
+   * starting up.
    */
-  private void syncFromStorage()
+  private synchronized void syncFromStorage()
   {
-    giant.lock();
-
-    try {
-      if (active) {
-        final Map<String, Task> newTasks = toTaskIDMap(taskStorage.getActiveTasks());
-        final int tasksSynced = newTasks.size();
-        final Map<String, Task> oldTasks = new HashMap<>(tasks);
-
-        // Calculate differences on IDs instead of Task Objects.
-        Set<String> commonIds = Sets.newHashSet(Sets.intersection(newTasks.keySet(), oldTasks.keySet()));
-        for (String taskID : commonIds) {
-          newTasks.remove(taskID);
-          oldTasks.remove(taskID);
-        }
-        Collection<Task> addedTasks = newTasks.values();
-        Collection<Task> removedTasks = oldTasks.values();
-
-        // Clean up removed Tasks
-        for (Task task : removedTasks) {
-          removeTaskInternal(task.getId());
-        }
-
-        // Add newly Added tasks to the queue
-        for (Task task : addedTasks) {
-          addTaskInternal(task);
-        }
-
-        log.info(
-            "Synced %d tasks from storage (%d tasks added, %d tasks removed).",
-            tasksSynced,
-            addedTasks.size(),
-            removedTasks.size()
-        );
-        requestManagement();
-      } else {
-        log.info("Not active. Skipping storage sync.");
-      }
-    }
-    catch (Exception e) {
-      log.warn(e, "Failed to sync tasks from storage!");
-      throw new RuntimeException(e);
-    }
-    finally {
-      giant.unlock();
-    }
-  }
-
-  private static Map<String, Task> toTaskIDMap(List<Task> taskList)
-  {
-    Map<String, Task> rv = new HashMap<>();
-    for (Task task : taskList) {
-      rv.put(task.getId(), task);
-    }
-    return rv;
-  }
-
-  private Map<String, Long> getDeltaValues(Map<String, Long> total, Map<String, Long> prev)
-  {
-    return total.entrySet()
-                .stream()
-                .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue() - prev.getOrDefault(e.getKey(), 0L)));
-  }
+    final Map<String, Task> newTasks = taskStorage.getActiveTasks().stream().collect(
+        Collectors.toMap(Task::getId, Function.identity())
+    );
+    final Map<String, Task> oldTasks = new HashMap<>(tasks);
+
+    // Calculate differences on IDs instead of Task Objects.
+    Set<String> commonIds = Sets.intersection(newTasks.keySet(), oldTasks.keySet());
+    for (String taskId : commonIds) {
+      newTasks.remove(taskId);
+      oldTasks.remove(taskId);
+    }
+    Collection<Task> addedTasks = newTasks.values();
+    Collection<Task> removedTasks = oldTasks.values();
+
+    // Add new tasks and clean up removed tasks
+    addedTasks.forEach(this::addTaskInternal);
+    removedTasks.forEach(this::removeTaskInternal);
+    log.info(

Review Comment:
   But yes - it does raise a good qn about the remainder of the workflow.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] jasonk000 commented on a diff in pull request #14293: Remove `giant` lock from Overlord TaskQueue

Posted by "jasonk000 (via GitHub)" <gi...@apache.org>.
jasonk000 commented on code in PR #14293:
URL: https://github.com/apache/druid/pull/14293#discussion_r1195776038


##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java:
##########
@@ -156,314 +153,248 @@ public TaskQueue(
     this.emitter = Preconditions.checkNotNull(emitter, "emitter");
   }
 
-  @VisibleForTesting
-  void setActive(boolean active)
-  {
-    this.active = active;
-  }
-
   /**
    * Starts this task queue. Allows {@link #add(Task)} to accept new tasks.
    */
   @LifecycleStart
-  public void start()
+  public synchronized void start()
   {
-    giant.lock();
-
-    try {
-      Preconditions.checkState(!active, "queue must be stopped");
-      active = true;
-      syncFromStorage();
-      // Mark these tasks as failed as they could not reacuire the lock
-      // Clean up needs to happen after tasks have been synced from storage
-      Set<Task> tasksToFail = taskLockbox.syncFromStorage().getTasksToFail();
-      for (Task task : tasksToFail) {
-        shutdown(task.getId(),
-                 "Shutting down forcefully as task failed to reacquire lock while becoming leader");
+    Preconditions.checkState(!active, "queue must be stopped");
+
+    // Mark queue as active only after first sync is complete
+    syncFromStorage();
+    active = true;
+
+    // Mark these tasks as failed as they could not reacquire locks
+    Set<Task> tasksToFail = taskLockbox.syncFromStorage().getTasksToFail();
+    for (Task task : tasksToFail) {
+      shutdown(
+          task.getId(),
+          "Shutting down forcefully as task failed to reacquire lock while becoming leader"
+      );
+    }
+    requestManagement();
+    // Remove any unacquired locks from storage (shutdown only clears entries for which a TaskLockPosse was acquired)
+    // This is called after requesting management as locks need to be cleared after notifyStatus is processed
+    for (Task task : tasksToFail) {
+      for (TaskLock lock : taskStorage.getLocks(task.getId())) {
+        taskStorage.removeLock(task.getId(), lock);
       }
-      managerExec.submit(
-          new Runnable()
-          {
-            @Override
-            public void run()
-            {
-              while (true) {
-                try {
-                  manage();
-                  break;
-                }
-                catch (InterruptedException e) {
-                  log.info("Interrupted, exiting!");
-                  break;
-                }
-                catch (Exception e) {
-                  final long restartDelay = config.getRestartDelay().getMillis();
-                  log.makeAlert(e, "Failed to manage").addData("restartDelay", restartDelay).emit();
-                  try {
-                    Thread.sleep(restartDelay);
-                  }
-                  catch (InterruptedException e2) {
-                    log.info("Interrupted, exiting!");
-                    break;
-                  }
-                }
-              }
+    }
+    log.info("Cleaned up [%d] tasks which failed to reacquire locks.", tasksToFail.size());
+
+    // Submit task management job
+    managerExec.submit(
+        () -> {
+          log.info("Beginning task management in [%s].", config.getStartDelay());
+          long startDelayMillis = config.getStartDelay().getMillis();
+          while (active) {
+            try {
+              Thread.sleep(startDelayMillis);
+              runTaskManagement();
+            }
+            catch (InterruptedException e) {
+              log.info("Interrupted, stopping task management.");
+              break;
+            }
+            catch (Exception e) {
+              startDelayMillis = config.getRestartDelay().getMillis();
+              log.makeAlert(e, "Failed to manage").addData("restartDelay", startDelayMillis).emit();
             }
           }
-      );
-      ScheduledExecutors.scheduleAtFixedRate(
-          storageSyncExec,
-          config.getStorageSyncRate(),
-          new Callable<ScheduledExecutors.Signal>()
-          {
-            @Override
-            public ScheduledExecutors.Signal call()
-            {
-              try {
-                syncFromStorage();
-              }
-              catch (Exception e) {
-                if (active) {
-                  log.makeAlert(e, "Failed to sync with storage").emit();
-                }
-              }
-              if (active) {
-                return ScheduledExecutors.Signal.REPEAT;
-              } else {
-                return ScheduledExecutors.Signal.STOP;
-              }
+        }
+    );
+
+    // Schedule storage sync job
+    ScheduledExecutors.scheduleAtFixedRate(
+        storageSyncExec,
+        config.getStorageSyncRate(),
+        () -> {
+          if (!active) {
+            log.info("Stopping storage sync as TaskQueue has been stopped");
+            return ScheduledExecutors.Signal.STOP;
+          }
+
+          try {
+            syncFromStorage();
+          }
+          catch (Exception e) {
+            if (active) {
+              log.makeAlert(e, "Failed to sync with storage").emit();
             }
           }
-      );
-      requestManagement();
-      // Remove any unacquired locks from storage (shutdown only clears entries for which a TaskLockPosse was acquired)
-      // This is called after requesting management as locks need to be cleared after notifyStatus is processed
-      for (Task task : tasksToFail) {
-        for (TaskLock lock : taskStorage.getLocks(task.getId())) {
-          taskStorage.removeLock(task.getId(), lock);
+
+          return active ? ScheduledExecutors.Signal.REPEAT : ScheduledExecutors.Signal.STOP;
         }
-      }
-    }
-    finally {
-      giant.unlock();
-    }
+    );
   }
 
   /**
    * Shuts down the queue.
    */
   @LifecycleStop
-  public void stop()
+  public synchronized void stop()
   {
-    giant.lock();
-
-    try {
-      tasks.clear();
-      taskFutures.clear();
-      active = false;
-      managerExec.shutdownNow();
-      storageSyncExec.shutdownNow();
-      requestManagement();
-    }
-    finally {
-      giant.unlock();
-    }
-  }
-
-  public boolean isActive()
-  {
-    return active;
+    active = false;
+    tasks.clear();
+    submittedTaskIds.clear();
+    recentlyCompletedTaskIds.clear();
+    managerExec.shutdownNow();
+    storageSyncExec.shutdownNow();
+    requestManagement();
   }
 
   /**
    * Request management from the management thread. Non-blocking.

Review Comment:
   Technically this is blocking now, not for long though.



##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java:
##########
@@ -749,131 +651,69 @@ private void handleStatus(final TaskStatus status)
           }
         }
     );
-    return statusFuture;
   }
 
   /**
-   * Resync the contents of this task queue with our storage facility. Useful to make sure our in-memory state
-   * corresponds to the storage facility even if the latter is manually modified.
+   * Resync the contents of this task queue with our storage facility.
+   * Useful to make sure our in-memory state corresponds to the storage facility
+   * even if the latter is manually modified.
+   * <p>
+   * This method must be called only when queue is {@link #active}, except when
+   * starting up.
    */
-  private void syncFromStorage()
+  private synchronized void syncFromStorage()
   {
-    giant.lock();
-
-    try {
-      if (active) {
-        final Map<String, Task> newTasks = toTaskIDMap(taskStorage.getActiveTasks());
-        final int tasksSynced = newTasks.size();
-        final Map<String, Task> oldTasks = new HashMap<>(tasks);
-
-        // Calculate differences on IDs instead of Task Objects.
-        Set<String> commonIds = Sets.newHashSet(Sets.intersection(newTasks.keySet(), oldTasks.keySet()));
-        for (String taskID : commonIds) {
-          newTasks.remove(taskID);
-          oldTasks.remove(taskID);
-        }
-        Collection<Task> addedTasks = newTasks.values();
-        Collection<Task> removedTasks = oldTasks.values();
-
-        // Clean up removed Tasks
-        for (Task task : removedTasks) {
-          removeTaskInternal(task.getId());
-        }
-
-        // Add newly Added tasks to the queue
-        for (Task task : addedTasks) {
-          addTaskInternal(task);
-        }
-
-        log.info(
-            "Synced %d tasks from storage (%d tasks added, %d tasks removed).",
-            tasksSynced,
-            addedTasks.size(),
-            removedTasks.size()
-        );
-        requestManagement();
-      } else {
-        log.info("Not active. Skipping storage sync.");
-      }
-    }
-    catch (Exception e) {
-      log.warn(e, "Failed to sync tasks from storage!");
-      throw new RuntimeException(e);
-    }
-    finally {
-      giant.unlock();
-    }
-  }
-
-  private static Map<String, Task> toTaskIDMap(List<Task> taskList)
-  {
-    Map<String, Task> rv = new HashMap<>();
-    for (Task task : taskList) {
-      rv.put(task.getId(), task);
-    }
-    return rv;
-  }
-
-  private Map<String, Long> getDeltaValues(Map<String, Long> total, Map<String, Long> prev)
-  {
-    return total.entrySet()
-                .stream()
-                .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue() - prev.getOrDefault(e.getKey(), 0L)));
-  }
+    final Map<String, Task> newTasks = taskStorage.getActiveTasks().stream().collect(
+        Collectors.toMap(Task::getId, Function.identity())
+    );
+    final Map<String, Task> oldTasks = new HashMap<>(tasks);
+
+    // Calculate differences on IDs instead of Task Objects.
+    Set<String> commonIds = Sets.intersection(newTasks.keySet(), oldTasks.keySet());
+    for (String taskId : commonIds) {
+      newTasks.remove(taskId);
+      oldTasks.remove(taskId);
+    }
+    Collection<Task> addedTasks = newTasks.values();
+    Collection<Task> removedTasks = oldTasks.values();
+
+    // Add new tasks and clean up removed tasks
+    addedTasks.forEach(this::addTaskInternal);
+    removedTasks.forEach(this::removeTaskInternal);
+    log.info(
+        "Synced [%d] tasks from storage. Added [%d] tasks, removed [%d] tasks.",
+        newTasks.size(), addedTasks.size(), removedTasks.size()
+    );
 
-  public Map<String, Long> getSuccessfulTaskCount()
-  {
-    Map<String, Long> total = CollectionUtils.mapValues(totalSuccessfulTaskCount, AtomicLong::get);
-    synchronized (totalSuccessfulTaskCount) {
-      Map<String, Long> delta = getDeltaValues(total, prevTotalSuccessfulTaskCount);
-      prevTotalSuccessfulTaskCount = total;
-      return delta;
-    }
+    requestManagement();
   }
 
-  public Map<String, Long> getFailedTaskCount()
+  public Map<String, Long> getAndResetSuccessfulTaskCounts()
   {
-    Map<String, Long> total = CollectionUtils.mapValues(totalFailedTaskCount, AtomicLong::get);
-    synchronized (totalFailedTaskCount) {
-      Map<String, Long> delta = getDeltaValues(total, prevTotalFailedTaskCount);
-      prevTotalFailedTaskCount = total;
-      return delta;
-    }
+    Map<String, Long> total = new HashMap<>(datasourceToSuccessfulTaskCount);
+    datasourceToSuccessfulTaskCount.clear();

Review Comment:
   This and following fns have a small race if the CHM was changed between the call to clear. There could be some alternatives, like iterating entries and using replaceAll, or remove() or merge() in a loop, that would allow a more atomic unload of the CHM? Or, maybe make the value in the map an AtomicLong, and CAS modify it to zero during unload?



##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java:
##########
@@ -156,314 +153,248 @@ public TaskQueue(
     this.emitter = Preconditions.checkNotNull(emitter, "emitter");
   }
 
-  @VisibleForTesting
-  void setActive(boolean active)
-  {
-    this.active = active;
-  }
-
   /**
    * Starts this task queue. Allows {@link #add(Task)} to accept new tasks.
    */
   @LifecycleStart
-  public void start()
+  public synchronized void start()
   {
-    giant.lock();
-
-    try {
-      Preconditions.checkState(!active, "queue must be stopped");
-      active = true;
-      syncFromStorage();
-      // Mark these tasks as failed as they could not reacuire the lock
-      // Clean up needs to happen after tasks have been synced from storage
-      Set<Task> tasksToFail = taskLockbox.syncFromStorage().getTasksToFail();
-      for (Task task : tasksToFail) {
-        shutdown(task.getId(),
-                 "Shutting down forcefully as task failed to reacquire lock while becoming leader");
+    Preconditions.checkState(!active, "queue must be stopped");
+
+    // Mark queue as active only after first sync is complete
+    syncFromStorage();
+    active = true;
+
+    // Mark these tasks as failed as they could not reacquire locks
+    Set<Task> tasksToFail = taskLockbox.syncFromStorage().getTasksToFail();
+    for (Task task : tasksToFail) {
+      shutdown(
+          task.getId(),
+          "Shutting down forcefully as task failed to reacquire lock while becoming leader"
+      );
+    }
+    requestManagement();
+    // Remove any unacquired locks from storage (shutdown only clears entries for which a TaskLockPosse was acquired)
+    // This is called after requesting management as locks need to be cleared after notifyStatus is processed

Review Comment:
   Observation - This is an interesting comment, given that there's no guarantee the management has run by now. I wonder if it's stale, or is there an issue here? ... , since it has same behaviour as previous code.



##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java:
##########
@@ -652,35 +569,21 @@ private void notifyStatus(final Task task, final TaskStatus taskStatus, String r
     catch (Throwable e) {
       // If task runner shutdown fails, continue with the task shutdown routine. We'll come back and try to
       // shut it down again later in manageInternalPostCritical, once it's removed from the "tasks" map.
-      log.warn(e, "TaskRunner failed to cleanup task after completion: %s", task.getId());
+      log.warn(e, "TaskRunner failed to cleanup task [%s] after completion", task.getId());
     }
 
-    // Critical section: remove this task from all of our tracking data structures.
-    giant.lock();
-    try {
-      if (removeTaskInternal(task.getId())) {

Review Comment:
   re: earlier note about possibly re-introducing #12901.



##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java:
##########
@@ -749,131 +651,69 @@ private void handleStatus(final TaskStatus status)
           }
         }
     );
-    return statusFuture;
   }
 
   /**
-   * Resync the contents of this task queue with our storage facility. Useful to make sure our in-memory state
-   * corresponds to the storage facility even if the latter is manually modified.
+   * Resync the contents of this task queue with our storage facility.
+   * Useful to make sure our in-memory state corresponds to the storage facility
+   * even if the latter is manually modified.
+   * <p>
+   * This method must be called only when queue is {@link #active}, except when
+   * starting up.
    */
-  private void syncFromStorage()
+  private synchronized void syncFromStorage()
   {
-    giant.lock();
-
-    try {
-      if (active) {
-        final Map<String, Task> newTasks = toTaskIDMap(taskStorage.getActiveTasks());
-        final int tasksSynced = newTasks.size();
-        final Map<String, Task> oldTasks = new HashMap<>(tasks);
-
-        // Calculate differences on IDs instead of Task Objects.
-        Set<String> commonIds = Sets.newHashSet(Sets.intersection(newTasks.keySet(), oldTasks.keySet()));
-        for (String taskID : commonIds) {
-          newTasks.remove(taskID);
-          oldTasks.remove(taskID);
-        }
-        Collection<Task> addedTasks = newTasks.values();
-        Collection<Task> removedTasks = oldTasks.values();
-
-        // Clean up removed Tasks
-        for (Task task : removedTasks) {
-          removeTaskInternal(task.getId());
-        }
-
-        // Add newly Added tasks to the queue
-        for (Task task : addedTasks) {
-          addTaskInternal(task);
-        }
-
-        log.info(
-            "Synced %d tasks from storage (%d tasks added, %d tasks removed).",
-            tasksSynced,
-            addedTasks.size(),
-            removedTasks.size()
-        );
-        requestManagement();
-      } else {
-        log.info("Not active. Skipping storage sync.");
-      }
-    }
-    catch (Exception e) {
-      log.warn(e, "Failed to sync tasks from storage!");
-      throw new RuntimeException(e);
-    }
-    finally {
-      giant.unlock();
-    }
-  }
-
-  private static Map<String, Task> toTaskIDMap(List<Task> taskList)
-  {
-    Map<String, Task> rv = new HashMap<>();
-    for (Task task : taskList) {
-      rv.put(task.getId(), task);
-    }
-    return rv;
-  }
-
-  private Map<String, Long> getDeltaValues(Map<String, Long> total, Map<String, Long> prev)
-  {
-    return total.entrySet()
-                .stream()
-                .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue() - prev.getOrDefault(e.getKey(), 0L)));
-  }
+    final Map<String, Task> newTasks = taskStorage.getActiveTasks().stream().collect(
+        Collectors.toMap(Task::getId, Function.identity())
+    );
+    final Map<String, Task> oldTasks = new HashMap<>(tasks);
+
+    // Calculate differences on IDs instead of Task Objects.
+    Set<String> commonIds = Sets.intersection(newTasks.keySet(), oldTasks.keySet());
+    for (String taskId : commonIds) {
+      newTasks.remove(taskId);
+      oldTasks.remove(taskId);
+    }
+    Collection<Task> addedTasks = newTasks.values();
+    Collection<Task> removedTasks = oldTasks.values();
+
+    // Add new tasks and clean up removed tasks
+    addedTasks.forEach(this::addTaskInternal);
+    removedTasks.forEach(this::removeTaskInternal);

Review Comment:
   :+1: much neater!



##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java:
##########
@@ -156,314 +153,248 @@ public TaskQueue(
     this.emitter = Preconditions.checkNotNull(emitter, "emitter");
   }
 
-  @VisibleForTesting
-  void setActive(boolean active)
-  {
-    this.active = active;
-  }
-
   /**
    * Starts this task queue. Allows {@link #add(Task)} to accept new tasks.
    */
   @LifecycleStart
-  public void start()
+  public synchronized void start()
   {
-    giant.lock();
-
-    try {
-      Preconditions.checkState(!active, "queue must be stopped");
-      active = true;
-      syncFromStorage();
-      // Mark these tasks as failed as they could not reacuire the lock
-      // Clean up needs to happen after tasks have been synced from storage
-      Set<Task> tasksToFail = taskLockbox.syncFromStorage().getTasksToFail();
-      for (Task task : tasksToFail) {
-        shutdown(task.getId(),
-                 "Shutting down forcefully as task failed to reacquire lock while becoming leader");
+    Preconditions.checkState(!active, "queue must be stopped");
+
+    // Mark queue as active only after first sync is complete
+    syncFromStorage();
+    active = true;
+
+    // Mark these tasks as failed as they could not reacquire locks
+    Set<Task> tasksToFail = taskLockbox.syncFromStorage().getTasksToFail();
+    for (Task task : tasksToFail) {
+      shutdown(
+          task.getId(),
+          "Shutting down forcefully as task failed to reacquire lock while becoming leader"
+      );
+    }
+    requestManagement();
+    // Remove any unacquired locks from storage (shutdown only clears entries for which a TaskLockPosse was acquired)
+    // This is called after requesting management as locks need to be cleared after notifyStatus is processed
+    for (Task task : tasksToFail) {
+      for (TaskLock lock : taskStorage.getLocks(task.getId())) {
+        taskStorage.removeLock(task.getId(), lock);
       }
-      managerExec.submit(
-          new Runnable()
-          {
-            @Override
-            public void run()
-            {
-              while (true) {
-                try {
-                  manage();
-                  break;
-                }
-                catch (InterruptedException e) {
-                  log.info("Interrupted, exiting!");
-                  break;
-                }
-                catch (Exception e) {
-                  final long restartDelay = config.getRestartDelay().getMillis();
-                  log.makeAlert(e, "Failed to manage").addData("restartDelay", restartDelay).emit();
-                  try {
-                    Thread.sleep(restartDelay);
-                  }
-                  catch (InterruptedException e2) {
-                    log.info("Interrupted, exiting!");
-                    break;
-                  }
-                }
-              }
+    }
+    log.info("Cleaned up [%d] tasks which failed to reacquire locks.", tasksToFail.size());
+
+    // Submit task management job
+    managerExec.submit(
+        () -> {
+          log.info("Beginning task management in [%s].", config.getStartDelay());
+          long startDelayMillis = config.getStartDelay().getMillis();
+          while (active) {
+            try {
+              Thread.sleep(startDelayMillis);
+              runTaskManagement();
+            }
+            catch (InterruptedException e) {
+              log.info("Interrupted, stopping task management.");
+              break;
+            }
+            catch (Exception e) {
+              startDelayMillis = config.getRestartDelay().getMillis();
+              log.makeAlert(e, "Failed to manage").addData("restartDelay", startDelayMillis).emit();
             }
           }
-      );
-      ScheduledExecutors.scheduleAtFixedRate(
-          storageSyncExec,
-          config.getStorageSyncRate(),
-          new Callable<ScheduledExecutors.Signal>()
-          {
-            @Override
-            public ScheduledExecutors.Signal call()
-            {
-              try {
-                syncFromStorage();
-              }
-              catch (Exception e) {
-                if (active) {
-                  log.makeAlert(e, "Failed to sync with storage").emit();
-                }
-              }
-              if (active) {
-                return ScheduledExecutors.Signal.REPEAT;
-              } else {
-                return ScheduledExecutors.Signal.STOP;
-              }
+        }
+    );
+
+    // Schedule storage sync job
+    ScheduledExecutors.scheduleAtFixedRate(
+        storageSyncExec,
+        config.getStorageSyncRate(),
+        () -> {
+          if (!active) {
+            log.info("Stopping storage sync as TaskQueue has been stopped");
+            return ScheduledExecutors.Signal.STOP;
+          }
+
+          try {
+            syncFromStorage();
+          }
+          catch (Exception e) {
+            if (active) {
+              log.makeAlert(e, "Failed to sync with storage").emit();
             }
           }
-      );
-      requestManagement();
-      // Remove any unacquired locks from storage (shutdown only clears entries for which a TaskLockPosse was acquired)
-      // This is called after requesting management as locks need to be cleared after notifyStatus is processed
-      for (Task task : tasksToFail) {
-        for (TaskLock lock : taskStorage.getLocks(task.getId())) {
-          taskStorage.removeLock(task.getId(), lock);
+
+          return active ? ScheduledExecutors.Signal.REPEAT : ScheduledExecutors.Signal.STOP;
         }
-      }
-    }
-    finally {
-      giant.unlock();
-    }
+    );
   }
 
   /**
    * Shuts down the queue.
    */
   @LifecycleStop
-  public void stop()
+  public synchronized void stop()
   {
-    giant.lock();
-
-    try {
-      tasks.clear();
-      taskFutures.clear();
-      active = false;
-      managerExec.shutdownNow();
-      storageSyncExec.shutdownNow();
-      requestManagement();
-    }
-    finally {
-      giant.unlock();
-    }
-  }
-
-  public boolean isActive()
-  {
-    return active;
+    active = false;
+    tasks.clear();
+    submittedTaskIds.clear();
+    recentlyCompletedTaskIds.clear();
+    managerExec.shutdownNow();
+    storageSyncExec.shutdownNow();
+    requestManagement();
   }
 
   /**
    * Request management from the management thread. Non-blocking.
-   *
-   * Other callers (such as notifyStatus) should trigger activity on the
-   * TaskQueue thread by requesting management here.
+   * <p>
+   * Callers (such as notifyStatus) can trigger task management by calling
+   * this method.
    */
-  void requestManagement()
+  private void requestManagement()
   {
-    // use a BlockingQueue since the offer/poll/wait behaviour is simple
-    // and very easy to reason about
-
-    // the request has to be offer (non blocking), since someone might request
-    // while already holding giant lock
-
-    // do not care if the item fits into the queue:
-    // if the queue is already full, request has been triggered anyway
-    managementMayBeNecessary.offer(this);
+    synchronized (managementRequested) {
+      managementRequested.set(true);
+      managementRequested.notify();
+    }
   }
 
   /**
-   * Await for an event to manage.
-   *
-   * This should only be called from the management thread to wait for activity.
+   * Waits for a management request to be triggered by another thread.
    *
-   * @param nanos
-   * @throws InterruptedException
+   * @throws InterruptedException if the thread is interrupted while waiting.
    */
-  @SuppressFBWarnings(value = "RV_RETURN_VALUE_IGNORED", justification = "using queue as notification mechanism, result has no value")
-  void awaitManagementNanos(long nanos) throws InterruptedException
+  private void awaitManagementRequest() throws InterruptedException
   {
     // mitigate a busy loop, it can get pretty busy when there are a lot of start/stops
     try {
-      Thread.sleep(MIN_WAIT_TIME_MS);
+      Thread.sleep(MIN_WAIT_TIME_MILLIS);
     }
     catch (InterruptedException e) {
       throw new RuntimeException(e);
     }
 
-    // wait for an item, if an item arrives (or is already available), complete immediately
-    // (does not actually matter what the item is)
-    managementMayBeNecessary.poll(nanos - (TimeUnit.MILLISECONDS.toNanos(MIN_WAIT_TIME_MS)), TimeUnit.NANOSECONDS);
-
-    // there may have been multiple requests, clear them all
-    managementMayBeNecessary.clear();
+    // Wait for management to be requested
+    synchronized (managementRequested) {
+      while (!managementRequested.get()) {
+        managementRequested.wait(MANAGEMENT_WAIT_TIMEOUT_MILLIS - MIN_WAIT_TIME_MILLIS);
+      }
+      managementRequested.compareAndSet(true, false);
+    }
   }
 
   /**
    * Main task runner management loop. Meant to run forever, or, at least until we're stopped.
    */
-  private void manage() throws InterruptedException
+  private void runTaskManagement() throws InterruptedException
   {
-    log.info("Beginning management in %s.", config.getStartDelay());
-    Thread.sleep(config.getStartDelay().getMillis());
-
     // Ignore return value- we'll get the IDs and futures from getKnownTasks later.
     taskRunner.restore();
 
     while (active) {
-      manageInternal();
-
-      // awaitNanos because management may become necessary without this condition signalling,
-      // due to e.g. tasks becoming ready when other folks mess with the TaskLockbox.
-      awaitManagementNanos(MANAGEMENT_WAIT_TIMEOUT_NANOS);
+      manageTasks();
+      awaitManagementRequest();
     }
   }
 
   @VisibleForTesting
-  void manageInternal()
+  void manageTasks()
   {
-    Set<String> knownTaskIds = new HashSet<>();
-    Map<String, ListenableFuture<TaskStatus>> runnerTaskFutures = new HashMap<>();
-
-    giant.lock();
-
-    try {
-      manageInternalCritical(knownTaskIds, runnerTaskFutures);
-    }
-    finally {
-      giant.unlock();
-    }
-
-    manageInternalPostCritical(knownTaskIds, runnerTaskFutures);
+    runReadyTasks();
+    killUnknownTasks();
   }
 
 
   /**
-   * Management loop critical section tasks.
-   *
-   * @param knownTaskIds will be modified - filled with known task IDs
-   * @param runnerTaskFutures will be modified - filled with futures related to getting the running tasks
+   * Submits ready tasks to the TaskRunner.
+   * <p>
+   * This method should be called only by the management thread.
    */
-  @GuardedBy("giant")
-  private void manageInternalCritical(
-      final Set<String> knownTaskIds,
-      final Map<String, ListenableFuture<TaskStatus>> runnerTaskFutures
-  )
+  private synchronized void runReadyTasks()
   {
     // Task futures available from the taskRunner
+    final Map<String, ListenableFuture<TaskStatus>> runnerTaskFutures = new HashMap<>();
     for (final TaskRunnerWorkItem workItem : taskRunner.getKnownTasks()) {
-      if (!recentlyCompletedTasks.contains(workItem.getTaskId())) {
-        // Don't do anything with tasks that have recently finished; notifyStatus will handle it.
+      if (!recentlyCompletedTaskIds.contains(workItem.getTaskId())) {
+        // Don't do anything with recently completed tasks; notifyStatus will handle it.
         runnerTaskFutures.put(workItem.getTaskId(), workItem.getResult());
       }
     }
+
     // Attain futures for all active tasks (assuming they are ready to run).
-    // Copy tasks list, as notifyStatus may modify it.
-    for (final Task task : ImmutableList.copyOf(tasks.values())) {
-      if (recentlyCompletedTasks.contains(task.getId())) {
-        // Don't do anything with tasks that have recently finished; notifyStatus will handle it.
-        continue;
+    for (final String taskId : Lists.newArrayList(activeTaskIdQueue)) {
+      final Task task = tasks.get(taskId);
+      if (task == null || recentlyCompletedTaskIds.contains(taskId)) {
+        // Don't do anything for unknown tasks or recently completed tasks
+      } else if (submittedTaskIds.contains(taskId)) {
+        // Re-trigger execution of pending task to avoid unnecessary delays
+        // see https://github.com/apache/druid/pull/6991
+        if (isTaskPending(task)) {
+          taskRunner.run(task);
+        }
+      } else if (runnerTaskFutures.containsKey(taskId)) {
+        attachCallbacks(task, runnerTaskFutures.get(taskId));
+        submittedTaskIds.add(taskId);
+      } else if (isTaskReady(task)) {
+        log.info("Asking taskRunner to run ready task [%s].", taskId);
+        attachCallbacks(task, taskRunner.run(task));
+        submittedTaskIds.add(taskId);
+      } else {
+        // Release all locks (possibly acquired by task.isReady()) if task is not ready
+        taskLockbox.unlockAll(task);
       }
+    }
+  }
 
-      knownTaskIds.add(task.getId());
-
-      if (!taskFutures.containsKey(task.getId())) {
-        final ListenableFuture<TaskStatus> runnerTaskFuture;
-        if (runnerTaskFutures.containsKey(task.getId())) {
-          runnerTaskFuture = runnerTaskFutures.get(task.getId());
-        } else {
-          // Task should be running, so run it.
-          final boolean taskIsReady;
-          try {
-            taskIsReady = task.isReady(taskActionClientFactory.create(task));
-          }
-          catch (Exception e) {
-            log.warn(e, "Exception thrown during isReady for task: %s", task.getId());
-            final String errorMessage;
-            if (e instanceof MaxAllowedLocksExceededException) {
-              errorMessage = e.getMessage();
-            } else {
-              errorMessage = "Failed while waiting for the task to be ready to run. "
-                                          + "See overlord logs for more details.";
-            }
-            notifyStatus(task, TaskStatus.failure(task.getId(), errorMessage), errorMessage);
-            continue;
-          }
-          if (taskIsReady) {
-            log.info("Asking taskRunner to run: %s", task.getId());
-            runnerTaskFuture = taskRunner.run(task);
-          } else {
-            // Task.isReady() can internally lock intervals or segments.
-            // We should release them if the task is not ready.
-            taskLockbox.unlockAll(task);
-            continue;
-          }
-        }
-        taskFutures.put(task.getId(), attachCallbacks(task, runnerTaskFuture));
-      } else if (isTaskPending(task)) {
-        // if the taskFutures contain this task and this task is pending, also let the taskRunner
-        // to run it to guarantee it will be assigned to run
-        // see https://github.com/apache/druid/pull/6991
-        taskRunner.run(task);
+  private boolean isTaskReady(Task task)
+  {
+    try {
+      return task.isReady(taskActionClientFactory.create(task));
+    }
+    catch (Exception e) {
+      log.warn(e, "Error while checking if task [%s] is ready to run.", task.getId());
+      final String errorMessage;
+      if (e instanceof MaxAllowedLocksExceededException) {
+        errorMessage = e.getMessage();
+      } else {
+        errorMessage = "Failed while waiting for the task to be ready to run. "
+                       + "See overlord logs for more details.";
       }
+      notifyStatus(task, TaskStatus.failure(task.getId(), errorMessage), errorMessage);
+      return false;
     }
   }
 
-  @VisibleForTesting
-  private void manageInternalPostCritical(
-      final Set<String> knownTaskIds,
-      final Map<String, ListenableFuture<TaskStatus>> runnerTaskFutures
-  )
+  /**
+   * Kills tasks not present in the set of known tasks.
+   */
+  private void killUnknownTasks()
   {
-    // Kill tasks that shouldn't be running
-    final Set<String> tasksToKill = Sets.difference(runnerTaskFutures.keySet(), knownTaskIds);
-    if (!tasksToKill.isEmpty()) {
-      log.info("Asking taskRunner to clean up %,d tasks.", tasksToKill.size());
-
-      // On large installations running several thousands of tasks,
-      // concatenating the list of known task ids can be compupationally expensive.
-      final boolean logKnownTaskIds = log.isDebugEnabled();
-      final String reason = logKnownTaskIds
-              ? StringUtils.format("Task is not in knownTaskIds[%s]", knownTaskIds)
-              : "Task is not in knownTaskIds";
-
-      for (final String taskId : tasksToKill) {
-        try {
-          taskRunner.shutdown(taskId, reason);
-        }
-        catch (Exception e) {
-          log.warn(e, "TaskRunner failed to clean up task: %s", taskId);
-        }
+    final Set<String> knownTaskIds = tasks.keySet();

Review Comment:
   I'm a little wary of reintroducing this bug: https://github.com/apache/druid/pull/12901, in the case that `tasks` was changed. Can you check over it? It seems like if `tasks` has changed we might have an issue. But - you've covered most of them with `synchronized`. Can you check in `add()` and `removeTasksInternal`, I think these operate on tasks without being synchronized, so it might lead to some race?



##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java:
##########
@@ -749,131 +651,69 @@ private void handleStatus(final TaskStatus status)
           }
         }
     );
-    return statusFuture;
   }
 
   /**
-   * Resync the contents of this task queue with our storage facility. Useful to make sure our in-memory state
-   * corresponds to the storage facility even if the latter is manually modified.
+   * Resync the contents of this task queue with our storage facility.
+   * Useful to make sure our in-memory state corresponds to the storage facility
+   * even if the latter is manually modified.
+   * <p>
+   * This method must be called only when queue is {@link #active}, except when
+   * starting up.
    */
-  private void syncFromStorage()
+  private synchronized void syncFromStorage()
   {
-    giant.lock();
-
-    try {
-      if (active) {
-        final Map<String, Task> newTasks = toTaskIDMap(taskStorage.getActiveTasks());
-        final int tasksSynced = newTasks.size();
-        final Map<String, Task> oldTasks = new HashMap<>(tasks);
-
-        // Calculate differences on IDs instead of Task Objects.
-        Set<String> commonIds = Sets.newHashSet(Sets.intersection(newTasks.keySet(), oldTasks.keySet()));
-        for (String taskID : commonIds) {
-          newTasks.remove(taskID);
-          oldTasks.remove(taskID);
-        }
-        Collection<Task> addedTasks = newTasks.values();
-        Collection<Task> removedTasks = oldTasks.values();
-
-        // Clean up removed Tasks
-        for (Task task : removedTasks) {
-          removeTaskInternal(task.getId());
-        }
-
-        // Add newly Added tasks to the queue
-        for (Task task : addedTasks) {
-          addTaskInternal(task);
-        }
-
-        log.info(
-            "Synced %d tasks from storage (%d tasks added, %d tasks removed).",
-            tasksSynced,
-            addedTasks.size(),
-            removedTasks.size()
-        );
-        requestManagement();
-      } else {
-        log.info("Not active. Skipping storage sync.");
-      }
-    }
-    catch (Exception e) {
-      log.warn(e, "Failed to sync tasks from storage!");
-      throw new RuntimeException(e);
-    }
-    finally {
-      giant.unlock();
-    }
-  }
-
-  private static Map<String, Task> toTaskIDMap(List<Task> taskList)
-  {
-    Map<String, Task> rv = new HashMap<>();
-    for (Task task : taskList) {
-      rv.put(task.getId(), task);
-    }
-    return rv;
-  }
-
-  private Map<String, Long> getDeltaValues(Map<String, Long> total, Map<String, Long> prev)
-  {
-    return total.entrySet()
-                .stream()
-                .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue() - prev.getOrDefault(e.getKey(), 0L)));
-  }
+    final Map<String, Task> newTasks = taskStorage.getActiveTasks().stream().collect(
+        Collectors.toMap(Task::getId, Function.identity())
+    );
+    final Map<String, Task> oldTasks = new HashMap<>(tasks);
+
+    // Calculate differences on IDs instead of Task Objects.
+    Set<String> commonIds = Sets.intersection(newTasks.keySet(), oldTasks.keySet());
+    for (String taskId : commonIds) {
+      newTasks.remove(taskId);
+      oldTasks.remove(taskId);
+    }
+    Collection<Task> addedTasks = newTasks.values();
+    Collection<Task> removedTasks = oldTasks.values();
+
+    // Add new tasks and clean up removed tasks
+    addedTasks.forEach(this::addTaskInternal);
+    removedTasks.forEach(this::removeTaskInternal);
+    log.info(

Review Comment:
   This existed before, but it might be wise to remove as much as possible, including logging, from the `synchronized` block, and tighten it down a bit.



##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java:
##########
@@ -488,58 +426,56 @@ public boolean add(final Task task) throws EntryExistsException
     // Set forceTimeChunkLock before adding task spec to taskStorage, so that we can see always consistent task spec.
     task.addToContextIfAbsent(Tasks.FORCE_TIME_CHUNK_LOCK_KEY, lockConfig.isForceTimeChunkLock());
     defaultTaskConfig.getContext().forEach(task::addToContextIfAbsent);
-    // Every task shuold use the lineage-based segment allocation protocol unless it is explicitly set to
+    // Every task should use the lineage-based segment allocation protocol unless it is explicitly set to
     // using the legacy protocol.
     task.addToContextIfAbsent(
         SinglePhaseParallelIndexTaskRunner.CTX_USE_LINEAGE_BASED_SEGMENT_ALLOCATION_KEY,
         SinglePhaseParallelIndexTaskRunner.DEFAULT_USE_LINEAGE_BASED_SEGMENT_ALLOCATION
     );
 
-    giant.lock();
-
-    try {
-      Preconditions.checkState(active, "Queue is not active!");
-      Preconditions.checkNotNull(task, "task");
-      Preconditions.checkState(tasks.size() < config.getMaxSize(), "Too many tasks (max = %,d)", config.getMaxSize());
-
-      // If this throws with any sort of exception, including TaskExistsException, we don't want to
-      // insert the task into our queue. So don't catch it.
-      taskStorage.insert(task, TaskStatus.running(task.getId()));
-      addTaskInternal(task);
-      requestManagement();
-      return true;
-    }
-    finally {
-      giant.unlock();
-    }
+    // Do not add the task to queue if insert into metadata fails for any reason
+    taskStorage.insert(task, TaskStatus.running(task.getId()));
+    addTaskInternal(task);
+    requestManagement();
+    return true;
   }
 
-  @GuardedBy("giant")
+  /**
+   * Atomically adds this task to the TaskQueue.
+   */
   private void addTaskInternal(final Task task)
   {
-    final Task existingTask = tasks.putIfAbsent(task.getId(), task);
-
-    if (existingTask == null) {
-      taskLockbox.add(task);
-    } else if (!existingTask.equals(task)) {
-      throw new ISE("Cannot add task ID [%s] with same ID as task that has already been added", task.getId());
-    }
+    tasks.computeIfAbsent(
+        task.getId(),
+        taskId -> {

Review Comment:
   It seems the implementation here relies on an implicit lock being taken by CHM. Might be worth noting given that it is implementations-specific with CHM, in case implementation is switched in the future.



##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java:
##########
@@ -156,314 +153,248 @@ public TaskQueue(
     this.emitter = Preconditions.checkNotNull(emitter, "emitter");
   }
 
-  @VisibleForTesting
-  void setActive(boolean active)
-  {
-    this.active = active;
-  }
-
   /**
    * Starts this task queue. Allows {@link #add(Task)} to accept new tasks.
    */
   @LifecycleStart
-  public void start()
+  public synchronized void start()
   {
-    giant.lock();
-
-    try {
-      Preconditions.checkState(!active, "queue must be stopped");
-      active = true;
-      syncFromStorage();
-      // Mark these tasks as failed as they could not reacuire the lock
-      // Clean up needs to happen after tasks have been synced from storage
-      Set<Task> tasksToFail = taskLockbox.syncFromStorage().getTasksToFail();
-      for (Task task : tasksToFail) {
-        shutdown(task.getId(),
-                 "Shutting down forcefully as task failed to reacquire lock while becoming leader");
+    Preconditions.checkState(!active, "queue must be stopped");
+
+    // Mark queue as active only after first sync is complete
+    syncFromStorage();
+    active = true;
+
+    // Mark these tasks as failed as they could not reacquire locks
+    Set<Task> tasksToFail = taskLockbox.syncFromStorage().getTasksToFail();
+    for (Task task : tasksToFail) {
+      shutdown(
+          task.getId(),
+          "Shutting down forcefully as task failed to reacquire lock while becoming leader"
+      );
+    }
+    requestManagement();
+    // Remove any unacquired locks from storage (shutdown only clears entries for which a TaskLockPosse was acquired)
+    // This is called after requesting management as locks need to be cleared after notifyStatus is processed
+    for (Task task : tasksToFail) {
+      for (TaskLock lock : taskStorage.getLocks(task.getId())) {
+        taskStorage.removeLock(task.getId(), lock);
       }
-      managerExec.submit(
-          new Runnable()
-          {
-            @Override
-            public void run()
-            {
-              while (true) {
-                try {
-                  manage();
-                  break;
-                }
-                catch (InterruptedException e) {
-                  log.info("Interrupted, exiting!");
-                  break;
-                }
-                catch (Exception e) {
-                  final long restartDelay = config.getRestartDelay().getMillis();
-                  log.makeAlert(e, "Failed to manage").addData("restartDelay", restartDelay).emit();
-                  try {
-                    Thread.sleep(restartDelay);
-                  }
-                  catch (InterruptedException e2) {
-                    log.info("Interrupted, exiting!");
-                    break;
-                  }
-                }
-              }
+    }
+    log.info("Cleaned up [%d] tasks which failed to reacquire locks.", tasksToFail.size());
+
+    // Submit task management job
+    managerExec.submit(
+        () -> {
+          log.info("Beginning task management in [%s].", config.getStartDelay());
+          long startDelayMillis = config.getStartDelay().getMillis();
+          while (active) {
+            try {
+              Thread.sleep(startDelayMillis);
+              runTaskManagement();
+            }
+            catch (InterruptedException e) {
+              log.info("Interrupted, stopping task management.");
+              break;
+            }
+            catch (Exception e) {
+              startDelayMillis = config.getRestartDelay().getMillis();
+              log.makeAlert(e, "Failed to manage").addData("restartDelay", startDelayMillis).emit();
             }
           }
-      );
-      ScheduledExecutors.scheduleAtFixedRate(
-          storageSyncExec,
-          config.getStorageSyncRate(),
-          new Callable<ScheduledExecutors.Signal>()
-          {
-            @Override
-            public ScheduledExecutors.Signal call()
-            {
-              try {
-                syncFromStorage();
-              }
-              catch (Exception e) {
-                if (active) {
-                  log.makeAlert(e, "Failed to sync with storage").emit();
-                }
-              }
-              if (active) {
-                return ScheduledExecutors.Signal.REPEAT;
-              } else {
-                return ScheduledExecutors.Signal.STOP;
-              }
+        }
+    );
+
+    // Schedule storage sync job
+    ScheduledExecutors.scheduleAtFixedRate(
+        storageSyncExec,
+        config.getStorageSyncRate(),
+        () -> {
+          if (!active) {
+            log.info("Stopping storage sync as TaskQueue has been stopped");
+            return ScheduledExecutors.Signal.STOP;
+          }
+
+          try {
+            syncFromStorage();
+          }
+          catch (Exception e) {
+            if (active) {
+              log.makeAlert(e, "Failed to sync with storage").emit();
             }
           }
-      );
-      requestManagement();
-      // Remove any unacquired locks from storage (shutdown only clears entries for which a TaskLockPosse was acquired)
-      // This is called after requesting management as locks need to be cleared after notifyStatus is processed
-      for (Task task : tasksToFail) {
-        for (TaskLock lock : taskStorage.getLocks(task.getId())) {
-          taskStorage.removeLock(task.getId(), lock);
+
+          return active ? ScheduledExecutors.Signal.REPEAT : ScheduledExecutors.Signal.STOP;
         }
-      }
-    }
-    finally {
-      giant.unlock();
-    }
+    );
   }
 
   /**
    * Shuts down the queue.
    */
   @LifecycleStop
-  public void stop()
+  public synchronized void stop()
   {
-    giant.lock();
-
-    try {
-      tasks.clear();
-      taskFutures.clear();
-      active = false;
-      managerExec.shutdownNow();
-      storageSyncExec.shutdownNow();
-      requestManagement();
-    }
-    finally {
-      giant.unlock();
-    }
-  }
-
-  public boolean isActive()
-  {
-    return active;
+    active = false;
+    tasks.clear();
+    submittedTaskIds.clear();
+    recentlyCompletedTaskIds.clear();
+    managerExec.shutdownNow();
+    storageSyncExec.shutdownNow();
+    requestManagement();
   }
 
   /**
    * Request management from the management thread. Non-blocking.
-   *
-   * Other callers (such as notifyStatus) should trigger activity on the
-   * TaskQueue thread by requesting management here.
+   * <p>
+   * Callers (such as notifyStatus) can trigger task management by calling
+   * this method.
    */
-  void requestManagement()
+  private void requestManagement()
   {
-    // use a BlockingQueue since the offer/poll/wait behaviour is simple
-    // and very easy to reason about
-
-    // the request has to be offer (non blocking), since someone might request
-    // while already holding giant lock
-
-    // do not care if the item fits into the queue:
-    // if the queue is already full, request has been triggered anyway
-    managementMayBeNecessary.offer(this);
+    synchronized (managementRequested) {
+      managementRequested.set(true);
+      managementRequested.notify();
+    }
   }
 
   /**
-   * Await for an event to manage.
-   *
-   * This should only be called from the management thread to wait for activity.
+   * Waits for a management request to be triggered by another thread.
    *
-   * @param nanos
-   * @throws InterruptedException
+   * @throws InterruptedException if the thread is interrupted while waiting.
    */
-  @SuppressFBWarnings(value = "RV_RETURN_VALUE_IGNORED", justification = "using queue as notification mechanism, result has no value")
-  void awaitManagementNanos(long nanos) throws InterruptedException
+  private void awaitManagementRequest() throws InterruptedException
   {
     // mitigate a busy loop, it can get pretty busy when there are a lot of start/stops
     try {
-      Thread.sleep(MIN_WAIT_TIME_MS);
+      Thread.sleep(MIN_WAIT_TIME_MILLIS);
     }
     catch (InterruptedException e) {
       throw new RuntimeException(e);
     }
 
-    // wait for an item, if an item arrives (or is already available), complete immediately
-    // (does not actually matter what the item is)
-    managementMayBeNecessary.poll(nanos - (TimeUnit.MILLISECONDS.toNanos(MIN_WAIT_TIME_MS)), TimeUnit.NANOSECONDS);
-
-    // there may have been multiple requests, clear them all
-    managementMayBeNecessary.clear();
+    // Wait for management to be requested
+    synchronized (managementRequested) {
+      while (!managementRequested.get()) {
+        managementRequested.wait(MANAGEMENT_WAIT_TIMEOUT_MILLIS - MIN_WAIT_TIME_MILLIS);
+      }
+      managementRequested.compareAndSet(true, false);

Review Comment:
   I think this can be a set() or even lazySet(), unless I missed something.



##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java:
##########
@@ -692,7 +595,7 @@ private ListenableFuture<TaskStatus> attachCallbacks(final Task task, final List
           @Override
           public void onSuccess(final TaskStatus status)
           {
-            log.info("Received %s status for task: %s", status.getStatusCode(), status.getId());
+            log.info("Received status [%s] for task [%s]", status.getStatusCode(), status.getId());

Review Comment:
   :+1: 
   



##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java:
##########
@@ -156,314 +153,248 @@ public TaskQueue(
     this.emitter = Preconditions.checkNotNull(emitter, "emitter");
   }
 
-  @VisibleForTesting
-  void setActive(boolean active)
-  {
-    this.active = active;
-  }
-
   /**
    * Starts this task queue. Allows {@link #add(Task)} to accept new tasks.
    */
   @LifecycleStart
-  public void start()
+  public synchronized void start()
   {
-    giant.lock();
-
-    try {
-      Preconditions.checkState(!active, "queue must be stopped");
-      active = true;
-      syncFromStorage();
-      // Mark these tasks as failed as they could not reacuire the lock
-      // Clean up needs to happen after tasks have been synced from storage
-      Set<Task> tasksToFail = taskLockbox.syncFromStorage().getTasksToFail();
-      for (Task task : tasksToFail) {
-        shutdown(task.getId(),
-                 "Shutting down forcefully as task failed to reacquire lock while becoming leader");
+    Preconditions.checkState(!active, "queue must be stopped");
+
+    // Mark queue as active only after first sync is complete
+    syncFromStorage();
+    active = true;
+
+    // Mark these tasks as failed as they could not reacquire locks
+    Set<Task> tasksToFail = taskLockbox.syncFromStorage().getTasksToFail();
+    for (Task task : tasksToFail) {
+      shutdown(
+          task.getId(),
+          "Shutting down forcefully as task failed to reacquire lock while becoming leader"
+      );
+    }
+    requestManagement();
+    // Remove any unacquired locks from storage (shutdown only clears entries for which a TaskLockPosse was acquired)
+    // This is called after requesting management as locks need to be cleared after notifyStatus is processed
+    for (Task task : tasksToFail) {
+      for (TaskLock lock : taskStorage.getLocks(task.getId())) {
+        taskStorage.removeLock(task.getId(), lock);
       }
-      managerExec.submit(
-          new Runnable()
-          {
-            @Override
-            public void run()
-            {
-              while (true) {
-                try {
-                  manage();
-                  break;
-                }
-                catch (InterruptedException e) {
-                  log.info("Interrupted, exiting!");
-                  break;
-                }
-                catch (Exception e) {
-                  final long restartDelay = config.getRestartDelay().getMillis();
-                  log.makeAlert(e, "Failed to manage").addData("restartDelay", restartDelay).emit();
-                  try {
-                    Thread.sleep(restartDelay);
-                  }
-                  catch (InterruptedException e2) {
-                    log.info("Interrupted, exiting!");
-                    break;
-                  }
-                }
-              }
+    }
+    log.info("Cleaned up [%d] tasks which failed to reacquire locks.", tasksToFail.size());
+
+    // Submit task management job
+    managerExec.submit(
+        () -> {
+          log.info("Beginning task management in [%s].", config.getStartDelay());
+          long startDelayMillis = config.getStartDelay().getMillis();
+          while (active) {
+            try {
+              Thread.sleep(startDelayMillis);
+              runTaskManagement();
+            }
+            catch (InterruptedException e) {
+              log.info("Interrupted, stopping task management.");
+              break;
+            }
+            catch (Exception e) {
+              startDelayMillis = config.getRestartDelay().getMillis();
+              log.makeAlert(e, "Failed to manage").addData("restartDelay", startDelayMillis).emit();
             }
           }
-      );
-      ScheduledExecutors.scheduleAtFixedRate(
-          storageSyncExec,
-          config.getStorageSyncRate(),
-          new Callable<ScheduledExecutors.Signal>()
-          {
-            @Override
-            public ScheduledExecutors.Signal call()
-            {
-              try {
-                syncFromStorage();
-              }
-              catch (Exception e) {
-                if (active) {
-                  log.makeAlert(e, "Failed to sync with storage").emit();
-                }
-              }
-              if (active) {
-                return ScheduledExecutors.Signal.REPEAT;
-              } else {
-                return ScheduledExecutors.Signal.STOP;
-              }
+        }
+    );
+
+    // Schedule storage sync job
+    ScheduledExecutors.scheduleAtFixedRate(
+        storageSyncExec,
+        config.getStorageSyncRate(),
+        () -> {
+          if (!active) {
+            log.info("Stopping storage sync as TaskQueue has been stopped");
+            return ScheduledExecutors.Signal.STOP;
+          }
+
+          try {
+            syncFromStorage();
+          }
+          catch (Exception e) {
+            if (active) {
+              log.makeAlert(e, "Failed to sync with storage").emit();
             }
           }
-      );
-      requestManagement();
-      // Remove any unacquired locks from storage (shutdown only clears entries for which a TaskLockPosse was acquired)
-      // This is called after requesting management as locks need to be cleared after notifyStatus is processed
-      for (Task task : tasksToFail) {
-        for (TaskLock lock : taskStorage.getLocks(task.getId())) {
-          taskStorage.removeLock(task.getId(), lock);
+
+          return active ? ScheduledExecutors.Signal.REPEAT : ScheduledExecutors.Signal.STOP;
         }
-      }
-    }
-    finally {
-      giant.unlock();
-    }
+    );
   }
 
   /**
    * Shuts down the queue.
    */
   @LifecycleStop
-  public void stop()
+  public synchronized void stop()
   {
-    giant.lock();
-
-    try {
-      tasks.clear();
-      taskFutures.clear();
-      active = false;
-      managerExec.shutdownNow();
-      storageSyncExec.shutdownNow();
-      requestManagement();
-    }
-    finally {
-      giant.unlock();
-    }
-  }
-
-  public boolean isActive()
-  {
-    return active;
+    active = false;
+    tasks.clear();
+    submittedTaskIds.clear();
+    recentlyCompletedTaskIds.clear();
+    managerExec.shutdownNow();
+    storageSyncExec.shutdownNow();
+    requestManagement();
   }
 
   /**
    * Request management from the management thread. Non-blocking.
-   *
-   * Other callers (such as notifyStatus) should trigger activity on the
-   * TaskQueue thread by requesting management here.
+   * <p>
+   * Callers (such as notifyStatus) can trigger task management by calling
+   * this method.
    */
-  void requestManagement()
+  private void requestManagement()
   {
-    // use a BlockingQueue since the offer/poll/wait behaviour is simple
-    // and very easy to reason about
-
-    // the request has to be offer (non blocking), since someone might request
-    // while already holding giant lock
-
-    // do not care if the item fits into the queue:
-    // if the queue is already full, request has been triggered anyway
-    managementMayBeNecessary.offer(this);
+    synchronized (managementRequested) {
+      managementRequested.set(true);
+      managementRequested.notify();
+    }
   }
 
   /**
-   * Await for an event to manage.
-   *
-   * This should only be called from the management thread to wait for activity.
+   * Waits for a management request to be triggered by another thread.
    *
-   * @param nanos
-   * @throws InterruptedException
+   * @throws InterruptedException if the thread is interrupted while waiting.
    */
-  @SuppressFBWarnings(value = "RV_RETURN_VALUE_IGNORED", justification = "using queue as notification mechanism, result has no value")
-  void awaitManagementNanos(long nanos) throws InterruptedException
+  private void awaitManagementRequest() throws InterruptedException
   {
     // mitigate a busy loop, it can get pretty busy when there are a lot of start/stops
     try {
-      Thread.sleep(MIN_WAIT_TIME_MS);
+      Thread.sleep(MIN_WAIT_TIME_MILLIS);
     }
     catch (InterruptedException e) {
       throw new RuntimeException(e);
     }
 
-    // wait for an item, if an item arrives (or is already available), complete immediately
-    // (does not actually matter what the item is)
-    managementMayBeNecessary.poll(nanos - (TimeUnit.MILLISECONDS.toNanos(MIN_WAIT_TIME_MS)), TimeUnit.NANOSECONDS);
-
-    // there may have been multiple requests, clear them all
-    managementMayBeNecessary.clear();
+    // Wait for management to be requested
+    synchronized (managementRequested) {
+      while (!managementRequested.get()) {
+        managementRequested.wait(MANAGEMENT_WAIT_TIMEOUT_MILLIS - MIN_WAIT_TIME_MILLIS);
+      }
+      managementRequested.compareAndSet(true, false);
+    }
   }
 
   /**
    * Main task runner management loop. Meant to run forever, or, at least until we're stopped.
    */
-  private void manage() throws InterruptedException
+  private void runTaskManagement() throws InterruptedException
   {
-    log.info("Beginning management in %s.", config.getStartDelay());
-    Thread.sleep(config.getStartDelay().getMillis());
-
     // Ignore return value- we'll get the IDs and futures from getKnownTasks later.
     taskRunner.restore();
 
     while (active) {
-      manageInternal();
-
-      // awaitNanos because management may become necessary without this condition signalling,
-      // due to e.g. tasks becoming ready when other folks mess with the TaskLockbox.
-      awaitManagementNanos(MANAGEMENT_WAIT_TIMEOUT_NANOS);
+      manageTasks();
+      awaitManagementRequest();
     }
   }
 
   @VisibleForTesting
-  void manageInternal()
+  void manageTasks()
   {
-    Set<String> knownTaskIds = new HashSet<>();
-    Map<String, ListenableFuture<TaskStatus>> runnerTaskFutures = new HashMap<>();
-
-    giant.lock();
-
-    try {
-      manageInternalCritical(knownTaskIds, runnerTaskFutures);
-    }
-    finally {
-      giant.unlock();
-    }
-
-    manageInternalPostCritical(knownTaskIds, runnerTaskFutures);
+    runReadyTasks();
+    killUnknownTasks();
   }
 
 
   /**
-   * Management loop critical section tasks.
-   *
-   * @param knownTaskIds will be modified - filled with known task IDs
-   * @param runnerTaskFutures will be modified - filled with futures related to getting the running tasks
+   * Submits ready tasks to the TaskRunner.
+   * <p>
+   * This method should be called only by the management thread.
    */
-  @GuardedBy("giant")
-  private void manageInternalCritical(
-      final Set<String> knownTaskIds,
-      final Map<String, ListenableFuture<TaskStatus>> runnerTaskFutures
-  )
+  private synchronized void runReadyTasks()
   {
     // Task futures available from the taskRunner
+    final Map<String, ListenableFuture<TaskStatus>> runnerTaskFutures = new HashMap<>();
     for (final TaskRunnerWorkItem workItem : taskRunner.getKnownTasks()) {
-      if (!recentlyCompletedTasks.contains(workItem.getTaskId())) {
-        // Don't do anything with tasks that have recently finished; notifyStatus will handle it.
+      if (!recentlyCompletedTaskIds.contains(workItem.getTaskId())) {
+        // Don't do anything with recently completed tasks; notifyStatus will handle it.
         runnerTaskFutures.put(workItem.getTaskId(), workItem.getResult());
       }
     }
+
     // Attain futures for all active tasks (assuming they are ready to run).
-    // Copy tasks list, as notifyStatus may modify it.
-    for (final Task task : ImmutableList.copyOf(tasks.values())) {
-      if (recentlyCompletedTasks.contains(task.getId())) {
-        // Don't do anything with tasks that have recently finished; notifyStatus will handle it.
-        continue;
+    for (final String taskId : Lists.newArrayList(activeTaskIdQueue)) {
+      final Task task = tasks.get(taskId);
+      if (task == null || recentlyCompletedTaskIds.contains(taskId)) {
+        // Don't do anything for unknown tasks or recently completed tasks
+      } else if (submittedTaskIds.contains(taskId)) {
+        // Re-trigger execution of pending task to avoid unnecessary delays
+        // see https://github.com/apache/druid/pull/6991
+        if (isTaskPending(task)) {
+          taskRunner.run(task);
+        }
+      } else if (runnerTaskFutures.containsKey(taskId)) {
+        attachCallbacks(task, runnerTaskFutures.get(taskId));
+        submittedTaskIds.add(taskId);
+      } else if (isTaskReady(task)) {
+        log.info("Asking taskRunner to run ready task [%s].", taskId);
+        attachCallbacks(task, taskRunner.run(task));
+        submittedTaskIds.add(taskId);
+      } else {
+        // Release all locks (possibly acquired by task.isReady()) if task is not ready
+        taskLockbox.unlockAll(task);
       }
+    }
+  }
 
-      knownTaskIds.add(task.getId());
-
-      if (!taskFutures.containsKey(task.getId())) {
-        final ListenableFuture<TaskStatus> runnerTaskFuture;
-        if (runnerTaskFutures.containsKey(task.getId())) {
-          runnerTaskFuture = runnerTaskFutures.get(task.getId());
-        } else {
-          // Task should be running, so run it.
-          final boolean taskIsReady;
-          try {
-            taskIsReady = task.isReady(taskActionClientFactory.create(task));
-          }
-          catch (Exception e) {
-            log.warn(e, "Exception thrown during isReady for task: %s", task.getId());
-            final String errorMessage;
-            if (e instanceof MaxAllowedLocksExceededException) {
-              errorMessage = e.getMessage();
-            } else {
-              errorMessage = "Failed while waiting for the task to be ready to run. "
-                                          + "See overlord logs for more details.";
-            }
-            notifyStatus(task, TaskStatus.failure(task.getId(), errorMessage), errorMessage);
-            continue;
-          }
-          if (taskIsReady) {
-            log.info("Asking taskRunner to run: %s", task.getId());
-            runnerTaskFuture = taskRunner.run(task);
-          } else {
-            // Task.isReady() can internally lock intervals or segments.
-            // We should release them if the task is not ready.
-            taskLockbox.unlockAll(task);
-            continue;
-          }
-        }
-        taskFutures.put(task.getId(), attachCallbacks(task, runnerTaskFuture));
-      } else if (isTaskPending(task)) {
-        // if the taskFutures contain this task and this task is pending, also let the taskRunner
-        // to run it to guarantee it will be assigned to run
-        // see https://github.com/apache/druid/pull/6991
-        taskRunner.run(task);
+  private boolean isTaskReady(Task task)
+  {
+    try {
+      return task.isReady(taskActionClientFactory.create(task));
+    }
+    catch (Exception e) {
+      log.warn(e, "Error while checking if task [%s] is ready to run.", task.getId());
+      final String errorMessage;
+      if (e instanceof MaxAllowedLocksExceededException) {
+        errorMessage = e.getMessage();
+      } else {
+        errorMessage = "Failed while waiting for the task to be ready to run. "
+                       + "See overlord logs for more details.";
       }
+      notifyStatus(task, TaskStatus.failure(task.getId(), errorMessage), errorMessage);
+      return false;
     }
   }
 
-  @VisibleForTesting
-  private void manageInternalPostCritical(
-      final Set<String> knownTaskIds,
-      final Map<String, ListenableFuture<TaskStatus>> runnerTaskFutures
-  )
+  /**
+   * Kills tasks not present in the set of known tasks.
+   */
+  private void killUnknownTasks()
   {
-    // Kill tasks that shouldn't be running
-    final Set<String> tasksToKill = Sets.difference(runnerTaskFutures.keySet(), knownTaskIds);
-    if (!tasksToKill.isEmpty()) {
-      log.info("Asking taskRunner to clean up %,d tasks.", tasksToKill.size());
-
-      // On large installations running several thousands of tasks,
-      // concatenating the list of known task ids can be compupationally expensive.
-      final boolean logKnownTaskIds = log.isDebugEnabled();
-      final String reason = logKnownTaskIds
-              ? StringUtils.format("Task is not in knownTaskIds[%s]", knownTaskIds)
-              : "Task is not in knownTaskIds";
-
-      for (final String taskId : tasksToKill) {
-        try {
-          taskRunner.shutdown(taskId, reason);
-        }
-        catch (Exception e) {
-          log.warn(e, "TaskRunner failed to clean up task: %s", taskId);
-        }
+    final Set<String> knownTaskIds = tasks.keySet();

Review Comment:
   Nb: earlier PR https://github.com/apache/druid/pull/12099 is maybe relevant here too for background/history.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] kfaraz commented on pull request #14293: Remove `giant` lock from Overlord TaskQueue

Posted by "kfaraz (via GitHub)" <gi...@apache.org>.
kfaraz commented on PR #14293:
URL: https://github.com/apache/druid/pull/14293#issuecomment-1550636710

   Thanks for the feedback, @jasonk000 ! I have responded to your comments and plan to make changes/add more tests wherever necessary.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org