You are viewing a plain text version of this content. The canonical link for it is here.
Posted to gitbox@hive.apache.org by GitBox <gi...@apache.org> on 2021/03/25 16:35:44 UTC

[GitHub] [hive] pgaref opened a new pull request #2123: HIVE-24472: Optimize LlapTaskSchedulerService::preemptTasksFromMap

pgaref opened a new pull request #2123:
URL: https://github.com/apache/hive/pull/2123


   ### What changes were proposed in this pull request?
   Moving the (compute intensive) selection of preemption candidates on a separate thread.
   
   
   ### Why are the changes needed?
   Make sure scheduling task cycles are as fast as possible and remove any non-essential work from that loop.
   
   
   ### Does this PR introduce _any_ user-facing change?
   No
   
   
   ### How was this patch tested?
   TestLlapTaskSchedulerService
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] pgaref commented on a change in pull request #2123: HIVE-24472: Optimize LlapTaskSchedulerService::preemptTasksFromMap

Posted by GitBox <gi...@apache.org>.
pgaref commented on a change in pull request #2123:
URL: https://github.com/apache/hive/pull/2123#discussion_r612757340



##########
File path: llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
##########
@@ -1954,6 +1911,37 @@ protected void schedulePendingTasks() throws InterruptedException {
           break;
         }
       }
+      // Finally take care of preemption requests that can unblock higher-pri tasks.
+      // This removes preemptable tasks from the runningList and sends out a preempt request to the system.
+      // Subsequent tasks will be scheduled once the de-allocate request for the preempted task is processed.
+      while (!preemptionCandidates.isEmpty()) {
+        TaskInfo toPreempt = preemptionCandidates.take();
+        // 1. task has not terminated
+        if (toPreempt.isGuaranteed != null) {
+          String host = toPreempt.getAssignedNode().getHost();
+           // 2. is currently assigned 3. no preemption pending on that Host
+          if (toPreempt.getState() == TaskInfo.State.ASSIGNED &&
+              (pendingPreemptionsPerHost.get(host) == null || pendingPreemptionsPerHost.get(host).intValue() == 0)) {
+            LOG.debug("Preempting task took {} ms {}", (clock.getTime() - toPreempt.getPreemptedTime()), toPreempt);

Review comment:
       Left it mostly to see how fast Preemption messages are propagated but its not super useful agree -- removed




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] pgaref commented on a change in pull request #2123: HIVE-24472: Optimize LlapTaskSchedulerService::preemptTasksFromMap

Posted by GitBox <gi...@apache.org>.
pgaref commented on a change in pull request #2123:
URL: https://github.com/apache/hive/pull/2123#discussion_r612757788



##########
File path: llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
##########
@@ -3049,7 +3131,7 @@ boolean isUpdateInProgress() {
       return isPendingUpdate;
     }
 
-    TezTaskAttemptID getAttemptId() {
+    synchronized TezTaskAttemptID getAttemptId() {

Review comment:
       Ack removed




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] pgaref merged pull request #2123: HIVE-24472: Optimize LlapTaskSchedulerService::preemptTasksFromMap

Posted by GitBox <gi...@apache.org>.
pgaref merged pull request #2123:
URL: https://github.com/apache/hive/pull/2123


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] pgaref commented on a change in pull request #2123: HIVE-24472: Optimize LlapTaskSchedulerService::preemptTasksFromMap

Posted by GitBox <gi...@apache.org>.
pgaref commented on a change in pull request #2123:
URL: https://github.com/apache/hive/pull/2123#discussion_r612758022



##########
File path: llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
##########
@@ -2324,7 +2278,114 @@ private void maybeAddToDelayedTaskQueue(TaskInfo taskInfo) {
     }
   }
 
+  private void maybeAddToHighPriorityTaskQueue(TaskInfo taskInfo) {
+    // Only add task if its not already in the Queue AND there no mores than HOSTS tasks there already
+    // as we are performing up to HOSTS preemptions at a time
+    if (!taskInfo.isInHighPriorityQueue() && highPriorityTaskQueue.size() < activeInstances.size()) {
+      taskInfo.setInHighPriorityQueue(true);
+      highPriorityTaskQueue.add(taskInfo);
+    }
+  }
+
   // ------ Inner classes defined after this point ------
+  class PreemptionSchedulerCallable implements Callable<Void> {
+    private final AtomicBoolean isShutdown = new AtomicBoolean(false);
+
+    @Override
+    public Void call() {
+      while (!isShutdown.get() && !Thread.currentThread().isInterrupted()) {
+        try {
+          TaskInfo taskInfo = getNextTask();
+          // Tasks can exist in the queue even after they have been scheduled.
+          // Process task Preemption only if the task is still in PENDING state.
+          processTaskPreemption(taskInfo);
+
+        } catch (InterruptedException e) {
+          if (isShutdown.get()) {
+            LOG.info("PreemptTaskScheduler thread interrupted after shutdown");
+            break;
+          } else {
+            LOG.warn("PreemptTaskScheduler thread interrupted before being shutdown");
+            throw new RuntimeException("PreemptTaskScheduler thread interrupted without being shutdown", e);
+          }
+        }
+      }
+      return null;
+    }
+
+    private void processTaskPreemption(TaskInfo taskInfo) {
+      if (shouldAttemptTask(taskInfo) && tryTaskPreemption(taskInfo)) {
+        trySchedulingPendingTasks();
+      }
+      // Enables scheduler to reAdd task in Queue if needed
+      taskInfo.setInHighPriorityQueue(false);
+    }
+
+    private boolean tryTaskPreemption(TaskInfo taskInfo) {
+      // Find a lower priority task that can be preempted on a particular host.
+      // ONLY if there's no pending preemptions on that host to avoid preempting twice for a task.
+      Set<String> potentialHosts = null; // null => preempt on any host.
+      readLock.lock();
+      try {
+        // Protect against a bad location being requested.
+        if (taskInfo.requestedHosts != null && taskInfo.requestedHosts.length != 0) {
+          potentialHosts = Sets.newHashSet(taskInfo.requestedHosts);
+        }
+        if (potentialHosts != null) {
+          // Preempt on specific host
+          boolean shouldPreempt = true;
+          for (String host : potentialHosts) {
+            // Preempt only if there are no pending preemptions on the same host
+            // When the preemption registers, the request at the highest priority will be given the slot,
+            // even if the initial preemption was caused by some other task.
+            // TODO Maybe register which task the preemption was for, to avoid a bad non-local allocation.
+            MutableInt pendingHostPreemptions = pendingPreemptionsPerHost.get(host);
+            if (pendingHostPreemptions != null && pendingHostPreemptions.intValue() > 0) {
+              shouldPreempt = false;
+              LOG.debug("No preempt candidate for task={}. Found an existing preemption request on host={}, pendingPreemptionCount={}",
+                  taskInfo.task, host, pendingHostPreemptions.intValue());
+              break;
+            }
+          }
+
+          if (!shouldPreempt) {
+            LOG.debug("No preempt candidate for {} on potential hosts={}. An existing preemption request exists",
+                taskInfo.task, potentialHosts);
+            return false;
+          }
+        } else {
+          // Unknown requested host -- Request for a preemption if there's none pending. If a single preemption is pending,
+          // and this is the next task to be assigned, it will be assigned once that slot becomes available.
+          if (pendingPreemptions.get() != 0) {
+            LOG.debug("Skipping preempt candidate since there are {} pending preemption request. For task={}",
+                pendingPreemptions.get(), taskInfo);
+            return false;
+          }
+        }
+
+        LOG.debug("Attempting preempt candidate for task={}, priority={} on potential hosts={}. pendingPreemptions={}",
+            taskInfo.task, taskInfo.priority, potentialHosts == null ? "ANY" : potentialHosts, pendingPreemptions.get());
+        return addTaskPreemptionCandidate(speculativeTasks, taskInfo, potentialHosts) ||
+            addTaskPreemptionCandidate(guaranteedTasks, taskInfo, potentialHosts);
+      } finally {
+        readLock.unlock();
+      }
+    }
+
+    public void shutdown() {
+      isShutdown.set(true);
+    }
+
+    public TaskInfo getNextTask() throws InterruptedException {

Review comment:
       sure




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] pgaref commented on a change in pull request #2123: HIVE-24472: Optimize LlapTaskSchedulerService::preemptTasksFromMap

Posted by GitBox <gi...@apache.org>.
pgaref commented on a change in pull request #2123:
URL: https://github.com/apache/hive/pull/2123#discussion_r612766954



##########
File path: llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
##########
@@ -429,6 +437,11 @@ public LlapTaskSchedulerService(TaskSchedulerContext taskSchedulerContext, Clock
     delayedTaskSchedulerExecutor =
         MoreExecutors.listeningDecorator(delayedTaskSchedulerExecutorRaw);
 
+    ExecutorService preemptTaskSchedulerExecutorRaw = Executors.newFixedThreadPool(1,

Review comment:
       Well, I agree but the actual LLAP preemption Conf we are using https://github.com/apache/hive/blob/61d5c641b2e414c7b7dfd92f2b402db3583507c8/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java#L5023
   
   is actually targeting the TaskExecutorService within the LlapDaemon (waitQueue tasks vs Running) and not the LlapTaskSchedulingService -- in a sense this a different type of preemption and I am not sure we should just use the same conf here.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] mustafaiman commented on pull request #2123: HIVE-24472: Optimize LlapTaskSchedulerService::preemptTasksFromMap

Posted by GitBox <gi...@apache.org>.
mustafaiman commented on pull request #2123:
URL: https://github.com/apache/hive/pull/2123#issuecomment-819061922


   👍 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] pgaref commented on pull request #2123: HIVE-24472: Optimize LlapTaskSchedulerService::preemptTasksFromMap

Posted by GitBox <gi...@apache.org>.
pgaref commented on pull request #2123:
URL: https://github.com/apache/hive/pull/2123#issuecomment-828292245


   This is now merged! Thanks @mustafaiman for the review! 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] pgaref commented on pull request #2123: HIVE-24472: Optimize LlapTaskSchedulerService::preemptTasksFromMap

Posted by GitBox <gi...@apache.org>.
pgaref commented on pull request #2123:
URL: https://github.com/apache/hive/pull/2123#issuecomment-817729951


   @mustafaiman  this is also ready for review when you have a sec! 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] mustafaiman commented on a change in pull request #2123: HIVE-24472: Optimize LlapTaskSchedulerService::preemptTasksFromMap

Posted by GitBox <gi...@apache.org>.
mustafaiman commented on a change in pull request #2123:
URL: https://github.com/apache/hive/pull/2123#discussion_r611864768



##########
File path: llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
##########
@@ -429,6 +437,11 @@ public LlapTaskSchedulerService(TaskSchedulerContext taskSchedulerContext, Clock
     delayedTaskSchedulerExecutor =
         MoreExecutors.listeningDecorator(delayedTaskSchedulerExecutorRaw);
 
+    ExecutorService preemptTaskSchedulerExecutorRaw = Executors.newFixedThreadPool(1,

Review comment:
       If preemption is turned off, we won't need this executor.

##########
File path: llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
##########
@@ -1954,6 +1911,37 @@ protected void schedulePendingTasks() throws InterruptedException {
           break;
         }
       }
+      // Finally take care of preemption requests that can unblock higher-pri tasks.
+      // This removes preemptable tasks from the runningList and sends out a preempt request to the system.
+      // Subsequent tasks will be scheduled once the de-allocate request for the preempted task is processed.
+      while (!preemptionCandidates.isEmpty()) {
+        TaskInfo toPreempt = preemptionCandidates.take();
+        // 1. task has not terminated
+        if (toPreempt.isGuaranteed != null) {
+          String host = toPreempt.getAssignedNode().getHost();
+           // 2. is currently assigned 3. no preemption pending on that Host
+          if (toPreempt.getState() == TaskInfo.State.ASSIGNED &&
+              (pendingPreemptionsPerHost.get(host) == null || pendingPreemptionsPerHost.get(host).intValue() == 0)) {
+            LOG.debug("Preempting task took {} ms {}", (clock.getTime() - toPreempt.getPreemptedTime()), toPreempt);

Review comment:
       This looks like a leftover from a debugging session.

##########
File path: llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
##########
@@ -2324,7 +2278,114 @@ private void maybeAddToDelayedTaskQueue(TaskInfo taskInfo) {
     }
   }
 
+  private void maybeAddToHighPriorityTaskQueue(TaskInfo taskInfo) {
+    // Only add task if its not already in the Queue AND there no mores than HOSTS tasks there already
+    // as we are performing up to HOSTS preemptions at a time
+    if (!taskInfo.isInHighPriorityQueue() && highPriorityTaskQueue.size() < activeInstances.size()) {
+      taskInfo.setInHighPriorityQueue(true);
+      highPriorityTaskQueue.add(taskInfo);
+    }
+  }
+
   // ------ Inner classes defined after this point ------
+  class PreemptionSchedulerCallable implements Callable<Void> {
+    private final AtomicBoolean isShutdown = new AtomicBoolean(false);
+
+    @Override
+    public Void call() {
+      while (!isShutdown.get() && !Thread.currentThread().isInterrupted()) {
+        try {
+          TaskInfo taskInfo = getNextTask();
+          // Tasks can exist in the queue even after they have been scheduled.
+          // Process task Preemption only if the task is still in PENDING state.
+          processTaskPreemption(taskInfo);
+
+        } catch (InterruptedException e) {
+          if (isShutdown.get()) {
+            LOG.info("PreemptTaskScheduler thread interrupted after shutdown");
+            break;
+          } else {
+            LOG.warn("PreemptTaskScheduler thread interrupted before being shutdown");
+            throw new RuntimeException("PreemptTaskScheduler thread interrupted without being shutdown", e);
+          }
+        }
+      }
+      return null;
+    }
+
+    private void processTaskPreemption(TaskInfo taskInfo) {
+      if (shouldAttemptTask(taskInfo) && tryTaskPreemption(taskInfo)) {
+        trySchedulingPendingTasks();
+      }
+      // Enables scheduler to reAdd task in Queue if needed
+      taskInfo.setInHighPriorityQueue(false);
+    }
+
+    private boolean tryTaskPreemption(TaskInfo taskInfo) {
+      // Find a lower priority task that can be preempted on a particular host.
+      // ONLY if there's no pending preemptions on that host to avoid preempting twice for a task.
+      Set<String> potentialHosts = null; // null => preempt on any host.
+      readLock.lock();
+      try {
+        // Protect against a bad location being requested.
+        if (taskInfo.requestedHosts != null && taskInfo.requestedHosts.length != 0) {
+          potentialHosts = Sets.newHashSet(taskInfo.requestedHosts);
+        }
+        if (potentialHosts != null) {
+          // Preempt on specific host
+          boolean shouldPreempt = true;
+          for (String host : potentialHosts) {
+            // Preempt only if there are no pending preemptions on the same host
+            // When the preemption registers, the request at the highest priority will be given the slot,
+            // even if the initial preemption was caused by some other task.
+            // TODO Maybe register which task the preemption was for, to avoid a bad non-local allocation.
+            MutableInt pendingHostPreemptions = pendingPreemptionsPerHost.get(host);
+            if (pendingHostPreemptions != null && pendingHostPreemptions.intValue() > 0) {
+              shouldPreempt = false;
+              LOG.debug("No preempt candidate for task={}. Found an existing preemption request on host={}, pendingPreemptionCount={}",
+                  taskInfo.task, host, pendingHostPreemptions.intValue());
+              break;
+            }
+          }
+
+          if (!shouldPreempt) {
+            LOG.debug("No preempt candidate for {} on potential hosts={}. An existing preemption request exists",
+                taskInfo.task, potentialHosts);
+            return false;
+          }
+        } else {
+          // Unknown requested host -- Request for a preemption if there's none pending. If a single preemption is pending,
+          // and this is the next task to be assigned, it will be assigned once that slot becomes available.
+          if (pendingPreemptions.get() != 0) {
+            LOG.debug("Skipping preempt candidate since there are {} pending preemption request. For task={}",
+                pendingPreemptions.get(), taskInfo);
+            return false;
+          }
+        }
+
+        LOG.debug("Attempting preempt candidate for task={}, priority={} on potential hosts={}. pendingPreemptions={}",
+            taskInfo.task, taskInfo.priority, potentialHosts == null ? "ANY" : potentialHosts, pendingPreemptions.get());
+        return addTaskPreemptionCandidate(speculativeTasks, taskInfo, potentialHosts) ||
+            addTaskPreemptionCandidate(guaranteedTasks, taskInfo, potentialHosts);
+      } finally {
+        readLock.unlock();
+      }
+    }
+
+    public void shutdown() {
+      isShutdown.set(true);
+    }
+
+    public TaskInfo getNextTask() throws InterruptedException {

Review comment:
       getNextTask does not add any value. Can you remove this and just use highPriorityTaskQueue.take() instead of getNextTask(). It is called from only one place.

##########
File path: llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
##########
@@ -3049,7 +3131,7 @@ boolean isUpdateInProgress() {
       return isPendingUpdate;
     }
 
-    TezTaskAttemptID getAttemptId() {
+    synchronized TezTaskAttemptID getAttemptId() {

Review comment:
       attemptId is final. Why would we need synchronization accessing it?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] mustafaiman commented on a change in pull request #2123: HIVE-24472: Optimize LlapTaskSchedulerService::preemptTasksFromMap

Posted by GitBox <gi...@apache.org>.
mustafaiman commented on a change in pull request #2123:
URL: https://github.com/apache/hive/pull/2123#discussion_r612782613



##########
File path: llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
##########
@@ -429,6 +437,11 @@ public LlapTaskSchedulerService(TaskSchedulerContext taskSchedulerContext, Clock
     delayedTaskSchedulerExecutor =
         MoreExecutors.listeningDecorator(delayedTaskSchedulerExecutorRaw);
 
+    ExecutorService preemptTaskSchedulerExecutorRaw = Executors.newFixedThreadPool(1,

Review comment:
       I checked that too and got confused. LlapTaskScheduler does the work of finding preemption candidates etc. even though preemption cannot occur in the end. Also, LlapTaskScheduler marks tasks as preempted and updates preemption stats eventhough nothing is preempted because of LLAP_DAEMON_TASK_SCHEDULER_ENABLE_PREEMPTION is false. Am I understanding this correctly?
   
   This is not the problem of this patch obviously. I am just asking to understand. I'll +1 this regardless.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org