You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2017/11/27 22:39:07 UTC

hive git commit: HIVE-18073 : AM may assert when its guaranteed task count is reduced (Sergey Shelukhin, reviewed by Prasanth Jayachandran)

Repository: hive
Updated Branches:
  refs/heads/master 44ef59915 -> 2b730524f


HIVE-18073 : AM may assert when its guaranteed task count is reduced (Sergey Shelukhin, reviewed by Prasanth Jayachandran)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/2b730524
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/2b730524
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/2b730524

Branch: refs/heads/master
Commit: 2b730524f19420df120ec73ef972ad244ae380c1
Parents: 44ef599
Author: sergey <se...@apache.org>
Authored: Mon Nov 27 14:33:22 2017 -0800
Committer: sergey <se...@apache.org>
Committed: Mon Nov 27 14:33:22 2017 -0800

----------------------------------------------------------------------
 .../tezplugins/LlapTaskSchedulerService.java    | 133 ++++++++++---------
 1 file changed, 73 insertions(+), 60 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/2b730524/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
----------------------------------------------------------------------
diff --git a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
index 66f3d2e..e97a267 100644
--- a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
+++ b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
@@ -431,21 +431,21 @@ public class LlapTaskSchedulerService extends TaskScheduler {
           delta -= totalUpdated;
           WM_LOG.info("Distributed " + totalUpdated);
         }
-        WM_LOG.info("Setting unused: " + unusedGuaranteed + " plus " + delta);
-        unusedGuaranteed += delta;
+        int result = (unusedGuaranteed += delta);
+        WM_LOG.info("Setting unused to " + result + " based on remaining delta " + delta);
       } else {
         delta = -delta;
         if (delta <= unusedGuaranteed) {
           // Somebody took away our unwanted ducks.
-          WM_LOG.info("Setting unused: " + unusedGuaranteed + " minus " + delta);
-          unusedGuaranteed -= delta;
+          int result = (unusedGuaranteed -= delta);
+          WM_LOG.info("Setting unused to " + result + " based on full delta " + delta);
           return;
         } else {
           delta -= unusedGuaranteed;
           unusedGuaranteed = 0;
           toUpdate = new ArrayList<>();
           int totalUpdated = revokeGuaranteed(delta, null, toUpdate);
-          WM_LOG.info("Unused is 0; revoked " + totalUpdated + " / " + delta);
+          WM_LOG.info("Setting unused to 0; revoked " + totalUpdated + " / " + delta);
           // We must be able to take away the requisite number; if we can't, where'd the ducks go?
           if (delta != totalUpdated) {
             throw new AssertionError("Failed to revoke " + delta + " guaranteed tasks locally");
@@ -1109,7 +1109,10 @@ public class LlapTaskSchedulerService extends TaskScheduler {
     List<TaskInfo> toUpdate = new ArrayList<>(1);
     int updatedCount = distributeGuaranteed(1, null, toUpdate);
     assert updatedCount <= 1;
-    unusedGuaranteed += (1 - updatedCount);
+    if (updatedCount == 0) {
+      int result = ++unusedGuaranteed;
+      WM_LOG.info("Returning the unused duck; unused is now " + result);
+    }
     if (toUpdate.isEmpty()) return null;
     assert toUpdate.size() == 1;
     return toUpdate.get(0);
@@ -1491,7 +1494,6 @@ public class LlapTaskSchedulerService extends TaskScheduler {
               && !removeFromRunningTaskMap(guaranteedTasks, task, taskInfo)) {
             Preconditions.checkState(false, "runningTasks should contain an entry if the task" +
               " was in running state. Caused by task: {}", task);
-            // TODO: [!!!] is it possible that we are losing the guaranteed task here? dbl check paths
           }
         }
       } else {
@@ -1543,7 +1545,7 @@ public class LlapTaskSchedulerService extends TaskScheduler {
 
   @VisibleForTesting
   protected void schedulePendingTasks() throws InterruptedException {
-    Ref<TaskInfo> downgradedTask = null;
+    Ref<TaskInfo> downgradedTask = new Ref<>(null);
     writeLock.lock();
     try {
       if (LOG.isDebugEnabled()) {
@@ -1565,31 +1567,10 @@ public class LlapTaskSchedulerService extends TaskScheduler {
             dagStats.registerDelayedAllocation();
           }
           taskInfo.triedAssigningTask();
-          if (unusedGuaranteed > 0) {
-            synchronized (taskInfo) {
-              assert !taskInfo.isPendingUpdate; // No updates before it's running.
-              taskInfo.isGuaranteed = true;
-            }
-            --unusedGuaranteed;
-          } else {
-            // We could be scheduling a guaranteed task when a higher priority task cannot be
-            // scheduled. Try to take a duck away from a lower priority task here.
-            downgradedTask = new Ref<>(null);
-            if (findGuaranteedToReallocate(taskInfo, downgradedTask)) {
-              // We are revoking another duck; don't wait. We could also give the duck
-              // to this task in the callback instead.
-              synchronized (taskInfo) {
-                assert !taskInfo.isPendingUpdate; // No updates before it's running.
-                taskInfo.isGuaranteed = true;
-              }
-              // Note: after this, the method MUST send the downgrade message to downgradedTask
-              //       (outside of the writeLock, preferably), before exiting.
-            }
-          }
-          ScheduleResult scheduleResult = scheduleTask(taskInfo, totalResource);
+          ScheduleResult scheduleResult = scheduleTask(taskInfo, totalResource, downgradedTask);
+          // Note: we must handle downgradedTask after this. We do it at the end, outside the lock.
           if (LOG.isDebugEnabled()) {
-            LOG.debug("ScheduleResult for Task: {} = {}", taskInfo,
-                scheduleResult);
+            LOG.debug("ScheduleResult for Task: {} = {}", taskInfo, scheduleResult);
           }
           if (scheduleResult == ScheduleResult.SCHEDULED) {
             taskIter.remove();
@@ -1699,8 +1680,8 @@ public class LlapTaskSchedulerService extends TaskScheduler {
     } finally {
       writeLock.unlock();
     }
-    if (downgradedTask != null && downgradedTask.value != null) {
-      WM_LOG.info("downgrading " + downgradedTask.value.attemptId);
+    if (downgradedTask.value != null) {
+      WM_LOG.info("Downgrading " + downgradedTask.value.attemptId);
       checkAndSendGuaranteedStateUpdate(downgradedTask.value);
     }
   }
@@ -1722,38 +1703,70 @@ public class LlapTaskSchedulerService extends TaskScheduler {
     return sb.toString();
   }
 
-  private ScheduleResult scheduleTask(TaskInfo taskInfo, Resource totalResource) {
+  private ScheduleResult scheduleTask(TaskInfo taskInfo, Resource totalResource,
+      Ref<TaskInfo> downgradedTask) {
     Preconditions.checkNotNull(totalResource, "totalResource can not be null");
     // If there's no memory available, fail
     if (totalResource.getMemory() <= 0) {
       return SELECT_HOST_RESULT_INADEQUATE_TOTAL_CAPACITY.scheduleResult;
     }
     SelectHostResult selectHostResult = selectHost(taskInfo);
-    if (selectHostResult.scheduleResult == ScheduleResult.SCHEDULED) {
-      NodeInfo nodeInfo = selectHostResult.nodeInfo;
-      Container container =
-          containerFactory.createContainer(nodeInfo.getResourcePerExecutor(), taskInfo.priority,
-              nodeInfo.getHost(),
-              nodeInfo.getRpcPort(),
-              nodeInfo.getServiceAddress());
-      writeLock.lock(); // While updating local structures
-      // Note: this is actually called under the epic writeLock in schedulePendingTasks
-      try {
-        // The canAccept part of this log message does not account for this allocation.
-        assignedTaskCounter.incrementAndGet();
-        LOG.info("Assigned #{}, task={} on node={}, to container={}",
-            assignedTaskCounter.get(),
-            taskInfo, nodeInfo.toShortString(), container.getId());
-        dagStats.registerTaskAllocated(taskInfo.requestedHosts, taskInfo.requestedRacks,
-            nodeInfo.getHost());
-        taskInfo.setAssignmentInfo(nodeInfo, container.getId(), clock.getTime());
-        registerRunningTask(taskInfo);
-        nodeInfo.registerTaskScheduled();
-      } finally {
-        writeLock.unlock();
-      }
-      getContext().taskAllocated(taskInfo.task, taskInfo.clientCookie, container);
+    if (selectHostResult.scheduleResult != ScheduleResult.SCHEDULED) {
+      return selectHostResult.scheduleResult;
     }
+    if (unusedGuaranteed > 0) {
+      boolean wasGuaranteed = false;
+      synchronized (taskInfo) {
+        assert !taskInfo.isPendingUpdate; // No updates before it's running.
+        wasGuaranteed = taskInfo.isGuaranteed;
+        taskInfo.isGuaranteed = true;
+      }
+      if (wasGuaranteed) {
+        // This should never happen - we only schedule one attempt once.
+        WM_LOG.error("The task had guaranteed flag set before scheduling: " + taskInfo);
+      } else {
+        int result = --unusedGuaranteed;
+        WM_LOG.info("Using an unused duck for " + taskInfo.attemptId
+            + "; unused is now " + result);
+      }
+    } else {
+      // We could be scheduling a guaranteed task when a higher priority task cannot be
+      // scheduled. Try to take a duck away from a lower priority task here.
+      if (findGuaranteedToReallocate(taskInfo, downgradedTask)) {
+        // We are revoking another duck; don't wait. We could also give the duck
+        // to this task in the callback instead.
+        synchronized (taskInfo) {
+          assert !taskInfo.isPendingUpdate; // No updates before it's running.
+          taskInfo.isGuaranteed = true;
+        }
+        // Note: after this, the caller MUST send the downgrade message to downgradedTask
+        //       (outside of the writeLock, preferably), before exiting.
+      }
+    }
+
+    NodeInfo nodeInfo = selectHostResult.nodeInfo;
+    Container container =
+        containerFactory.createContainer(nodeInfo.getResourcePerExecutor(), taskInfo.priority,
+            nodeInfo.getHost(),
+            nodeInfo.getRpcPort(),
+            nodeInfo.getServiceAddress());
+    writeLock.lock(); // While updating local structures
+    // Note: this is actually called under the epic writeLock in schedulePendingTasks
+    try {
+      // The canAccept part of this log message does not account for this allocation.
+      assignedTaskCounter.incrementAndGet();
+      LOG.info("Assigned #{}, task={} on node={}, to container={}",
+          assignedTaskCounter.get(),
+          taskInfo, nodeInfo.toShortString(), container.getId());
+      dagStats.registerTaskAllocated(taskInfo.requestedHosts, taskInfo.requestedRacks,
+          nodeInfo.getHost());
+      taskInfo.setAssignmentInfo(nodeInfo, container.getId(), clock.getTime());
+      registerRunningTask(taskInfo);
+      nodeInfo.registerTaskScheduled();
+    } finally {
+      writeLock.unlock();
+    }
+    getContext().taskAllocated(taskInfo.task, taskInfo.clientCookie, container);
     return selectHostResult.scheduleResult;
   }
 
@@ -2818,7 +2831,7 @@ public class LlapTaskSchedulerService extends TaskScheduler {
     communicator.startUpdateGuaranteed(ti.attemptId, ti.assignedNode, newState, UPDATE_CALLBACK, ti);
   }
 
-
+  @VisibleForTesting
   int getUnusedGuaranteedCount() {
     return unusedGuaranteed;
   }