You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ha...@apache.org on 2018/05/21 15:10:21 UTC

hadoop git commit: YARN-8248. Job hangs when a job requests a resource that its queue does not have. (Szilard Nemeth via Haibo Chen)

Repository: hadoop
Updated Branches:
  refs/heads/trunk 3d2d9dbca -> f48fec83d


YARN-8248. Job hangs when a job requests a resource that its queue does not have. (Szilard Nemeth via Haibo Chen)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/f48fec83
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/f48fec83
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/f48fec83

Branch: refs/heads/trunk
Commit: f48fec83d0f2d1a781a141ad7216463c5526321f
Parents: 3d2d9db
Author: Haibo Chen <ha...@apache.org>
Authored: Mon May 21 08:00:21 2018 -0700
Committer: Haibo Chen <ha...@apache.org>
Committed: Mon May 21 08:10:41 2018 -0700

----------------------------------------------------------------------
 .../scheduler/SchedulerUtils.java               | 159 ++++++++++----
 .../scheduler/fair/FSAppAttempt.java            |  10 +-
 .../scheduler/fair/FSParentQueue.java           |   3 +
 .../scheduler/fair/FairScheduler.java           | 115 +++++++++--
 .../scheduler/fair/FairSchedulerTestBase.java   |  64 ++++--
 .../scheduler/fair/TestFairScheduler.java       | 205 +++++++++++++++++++
 6 files changed, 481 insertions(+), 75 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/f48fec83/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java
index 9b3c20a..7de250d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java
@@ -18,9 +18,14 @@
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
 
 import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
 import java.util.Set;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -40,6 +45,8 @@ import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.InvalidLabelResourceRequestException;
 import org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException;
+import org.apache.hadoop.yarn.exceptions
+        .SchedulerInvalidResoureRequestException;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.security.AccessType;
@@ -61,12 +68,37 @@ import org.apache.hadoop.yarn.util.resource.Resources;
 @Unstable
 public class SchedulerUtils {
 
+  /**
+   * This class contains invalid resource information along with its
+   * resource request.
+   */
+  public static class MaxResourceValidationResult {
+    private ResourceRequest resourceRequest;
+    private List<ResourceInformation> invalidResources;
+
+    MaxResourceValidationResult(ResourceRequest resourceRequest,
+        List<ResourceInformation> invalidResources) {
+      this.resourceRequest = resourceRequest;
+      this.invalidResources = invalidResources;
+    }
+
+    public boolean isValid() {
+      return invalidResources.isEmpty();
+    }
+
+    @Override
+    public String toString() {
+      return "MaxResourceValidationResult{" + "resourceRequest="
+          + resourceRequest + ", invalidResources=" + invalidResources + '}';
+    }
+  }
+
   private static final Log LOG = LogFactory.getLog(SchedulerUtils.class);
 
-  private static final RecordFactory recordFactory = 
+  private static final RecordFactory recordFactory =
       RecordFactoryProvider.getRecordFactory(null);
 
-  public static final String RELEASED_CONTAINER = 
+  public static final String RELEASED_CONTAINER =
       "Container released by application";
 
   public static final String UPDATED_CONTAINER =
@@ -325,6 +357,22 @@ public class SchedulerUtils {
     }
   }
 
+  private static Map<String, ResourceInformation> getZeroResources(
+      Resource resource) {
+    Map<String, ResourceInformation> resourceInformations = Maps.newHashMap();
+    int maxLength = ResourceUtils.getNumberOfKnownResourceTypes();
+
+    for (int i = 0; i < maxLength; i++) {
+      ResourceInformation resourceInformation =
+          resource.getResourceInformation(i);
+      if (resourceInformation.getValue() == 0L) {
+        resourceInformations.put(resourceInformation.getName(),
+            resourceInformation);
+      }
+    }
+    return resourceInformations;
+  }
+
   @Private
   @VisibleForTesting
   static void checkResourceRequestAgainstAvailableResource(Resource reqResource,
@@ -339,47 +387,86 @@ public class SchedulerUtils {
             reqResourceName);
       }
 
-      final ResourceInformation availableRI =
-          availableResource.getResourceInformation(reqResourceName);
+      boolean valid = checkResource(requestedRI, availableResource);
+      if (!valid) {
+        throwInvalidResourceException(reqResource, availableResource,
+            reqResourceName);
+      }
+    }
+  }
 
-      long requestedResourceValue = requestedRI.getValue();
-      long availableResourceValue = availableRI.getValue();
-      int unitsRelation = UnitsConversionUtil
-          .compareUnits(requestedRI.getUnits(), availableRI.getUnits());
+  public static MaxResourceValidationResult
+      validateResourceRequestsAgainstQueueMaxResource(
+      ResourceRequest resReq, Resource availableResource)
+      throws SchedulerInvalidResoureRequestException {
+    final Resource reqResource = resReq.getCapability();
+    Map<String, ResourceInformation> resourcesWithZeroAmount =
+        getZeroResources(availableResource);
+
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("Resources with zero amount: "
+          + Arrays.toString(resourcesWithZeroAmount.entrySet().toArray()));
+    }
 
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Requested resource information: " + requestedRI);
-        LOG.debug("Available resource information: " + availableRI);
-        LOG.debug("Relation of units: " + unitsRelation);
-      }
+    List<ResourceInformation> invalidResources = Lists.newArrayList();
+    for (int i = 0; i < ResourceUtils.getNumberOfKnownResourceTypes(); i++) {
+      final ResourceInformation requestedRI =
+          reqResource.getResourceInformation(i);
+      final String reqResourceName = requestedRI.getName();
 
-      // requested resource unit is less than available resource unit
-      // e.g. requestedUnit: "m", availableUnit: "K")
-      if (unitsRelation < 0) {
-        availableResourceValue =
-            UnitsConversionUtil.convert(availableRI.getUnits(),
-                requestedRI.getUnits(), availableRI.getValue());
-
-        // requested resource unit is greater than available resource unit
-        // e.g. requestedUnit: "G", availableUnit: "M")
-      } else if (unitsRelation > 0) {
-        requestedResourceValue =
-            UnitsConversionUtil.convert(requestedRI.getUnits(),
-                availableRI.getUnits(), requestedRI.getValue());
+      if (resourcesWithZeroAmount.containsKey(reqResourceName)
+          && requestedRI.getValue() > 0) {
+        invalidResources.add(requestedRI);
       }
+    }
+    return new MaxResourceValidationResult(resReq, invalidResources);
+  }
 
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Requested resource value after conversion: " +
-                requestedResourceValue);
-        LOG.info("Available resource value after conversion: " +
-                availableResourceValue);
-      }
+  /**
+   * Checks requested ResouceInformation against available Resource.
+   * @param requestedRI
+   * @param availableResource
+   * @return true if request is valid, false otherwise.
+   */
+  private static boolean checkResource(
+      ResourceInformation requestedRI, Resource availableResource) {
+    final ResourceInformation availableRI =
+        availableResource.getResourceInformation(requestedRI.getName());
 
-      if (requestedResourceValue > availableResourceValue) {
-        throwInvalidResourceException(reqResource, availableResource,
-            reqResourceName);
-      }
+    long requestedResourceValue = requestedRI.getValue();
+    long availableResourceValue = availableRI.getValue();
+    int unitsRelation = UnitsConversionUtil.compareUnits(requestedRI.getUnits(),
+        availableRI.getUnits());
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Requested resource information: " + requestedRI);
+      LOG.debug("Available resource information: " + availableRI);
+      LOG.debug("Relation of units: " + unitsRelation);
+    }
+
+    // requested resource unit is less than available resource unit
+    // e.g. requestedUnit: "m", availableUnit: "K")
+    if (unitsRelation < 0) {
+      availableResourceValue =
+          UnitsConversionUtil.convert(availableRI.getUnits(),
+              requestedRI.getUnits(), availableRI.getValue());
+
+      // requested resource unit is greater than available resource unit
+      // e.g. requestedUnit: "G", availableUnit: "M")
+    } else if (unitsRelation > 0) {
+      requestedResourceValue =
+          UnitsConversionUtil.convert(requestedRI.getUnits(),
+              availableRI.getUnits(), requestedRI.getValue());
     }
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Requested resource value after conversion: "
+          + requestedResourceValue);
+      LOG.info("Available resource value after conversion: "
+          + availableResourceValue);
+    }
+
+    return requestedResourceValue <= availableResourceValue;
   }
 
   private static void throwInvalidResourceException(Resource reqResource,

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f48fec83/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java
index 0305702..281aded 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java
@@ -459,7 +459,6 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
       // Add it to allContainers list.
       addToNewlyAllocatedContainers(node, rmContainer);
       liveContainers.put(container.getId(), rmContainer);
-
       // Update consumption and track allocations
       ContainerRequest containerRequest = appSchedulingInfo.allocate(
           type, node, schedulerKey, container);
@@ -867,6 +866,12 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
         if (reserved) {
           unreserve(schedulerKey, node);
         }
+        if (LOG.isDebugEnabled()) {
+          LOG.debug(String.format(
+              "Resource ask %s fits in available node resources %s, " +
+                      "but no container was allocated",
+              capability, available));
+        }
         return Resources.none();
       }
 
@@ -1096,7 +1101,8 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
     } else if (!getQueue().fitsInMaxShare(resource)) {
       // The requested container must fit in queue maximum share
       updateAMDiagnosticMsg(resource,
-          " exceeds current queue or its parents maximum resource allowed).");
+          " exceeds current queue or its parents maximum resource allowed). " +
+                  "Max share of queue: " + getQueue().getMaxShare());
 
       ret = false;
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f48fec83/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java
index a8e53fc..26c5630 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java
@@ -182,6 +182,9 @@ public class FSParentQueue extends FSQueue {
 
     // If this queue is over its limit, reject
     if (!assignContainerPreCheck(node)) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Assign container precheck on node " + node + " failed");
+      }
       return assigned;
     }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f48fec83/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
index 1f85814..1c4bd51 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
@@ -42,6 +43,8 @@ import org.apache.hadoop.yarn.api.records.ResourceOption;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.api.records.SchedulingRequest;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions
+        .SchedulerInvalidResoureRequestException;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.proto.YarnServiceProtos.SchedulerResourceTypes;
@@ -73,6 +76,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerUpdates;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils.MaxResourceValidationResult;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.QueueEntitlement;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent;
@@ -449,10 +453,7 @@ public class FairScheduler extends
       String message =
           "Reject application " + applicationId + " submitted by user " + user
               + " with an empty queue name.";
-      LOG.info(message);
-      rmContext.getDispatcher().getEventHandler().handle(
-          new RMAppEvent(applicationId, RMAppEventType.APP_REJECTED,
-              message));
+      rejectApplicationWithMessage(applicationId, message);
       return;
     }
 
@@ -461,10 +462,7 @@ public class FairScheduler extends
           "Reject application " + applicationId + " submitted by user " + user
               + " with an illegal queue name " + queueName + ". "
               + "The queue name cannot start/end with period.";
-      LOG.info(message);
-      rmContext.getDispatcher().getEventHandler().handle(
-          new RMAppEvent(applicationId, RMAppEventType.APP_REJECTED,
-              message));
+      rejectApplicationWithMessage(applicationId, message);
       return;
     }
 
@@ -476,6 +474,31 @@ public class FairScheduler extends
         return;
       }
 
+      if (rmApp != null && rmApp.getAMResourceRequests() != null) {
+        // Resources.fitsIn would always return false when queueMaxShare is 0
+        // for any resource, but only using Resources.fitsIn is not enough
+        // is it would return false for such cases when the requested
+        // resource is smaller than the max resource but that max resource is
+        // not zero, e.g. requested vCores = 2, max vCores = 1.
+        // With this check, we only reject those applications where resource
+        // requested is greater than 0 and we have 0
+        // of that resource on the queue.
+        List<MaxResourceValidationResult> invalidAMResourceRequests =
+                validateResourceRequests(rmApp.getAMResourceRequests(), queue);
+
+        if (!invalidAMResourceRequests.isEmpty()) {
+          String msg = String.format(
+                  "Cannot submit application %s to queue %s because "
+                          + "it has zero amount of resource for a requested "
+                          + "resource! Invalid requested AM resources: %s, "
+                          + "maximum queue resources: %s",
+                  applicationId, queue.getName(),
+                  invalidAMResourceRequests, queue.getMaxShare());
+          rejectApplicationWithMessage(applicationId, msg);
+          return;
+        }
+      }
+
       // Enforce ACLs
       UserGroupInformation userUgi = UserGroupInformation.createRemoteUser(
           user);
@@ -485,9 +508,7 @@ public class FairScheduler extends
         String msg = "User " + userUgi.getUserName()
             + " cannot submit applications to queue " + queue.getName()
             + "(requested queuename is " + queueName + ")";
-        LOG.info(msg);
-        rmContext.getDispatcher().getEventHandler().handle(
-            new RMAppEvent(applicationId, RMAppEventType.APP_REJECTED, msg));
+        rejectApplicationWithMessage(applicationId, msg);
         return;
       }
 
@@ -604,10 +625,7 @@ public class FairScheduler extends
     }
 
     if (appRejectMsg != null && rmApp != null) {
-      LOG.error(appRejectMsg);
-      rmContext.getDispatcher().getEventHandler().handle(
-          new RMAppEvent(rmApp.getApplicationId(),
-              RMAppEventType.APP_REJECTED, appRejectMsg));
+      rejectApplicationWithMessage(rmApp.getApplicationId(), appRejectMsg);
       return null;
     }
 
@@ -834,7 +852,6 @@ public class FairScheduler extends
       List<ResourceRequest> ask, List<SchedulingRequest> schedulingRequests,
       List<ContainerId> release, List<String> blacklistAdditions,
       List<String> blacklistRemovals, ContainerUpdates updateRequests) {
-
     // Make sure this application exists
     FSAppAttempt application = getSchedulerApp(appAttemptId);
     if (application == null) {
@@ -854,6 +871,24 @@ public class FairScheduler extends
       return EMPTY_ALLOCATION;
     }
 
+    ApplicationId applicationId = application.getApplicationId();
+    FSLeafQueue queue = application.getQueue();
+    List<MaxResourceValidationResult> invalidAsks =
+            validateResourceRequests(ask, queue);
+
+    // We need to be fail-fast here if any invalid ask is detected.
+    // If we would have thrown exception later, this could be problematic as
+    // tokens and promoted / demoted containers would have been lost because
+    // scheduler would clear them right away and AM
+    // would not get this information.
+    if (!invalidAsks.isEmpty()) {
+      throw new SchedulerInvalidResoureRequestException(String.format(
+              "Resource request is invalid for application %s because queue %s "
+                      + "has 0 amount of resource for a resource type! "
+                      + "Validation result: %s",
+              applicationId, queue.getName(), invalidAsks));
+    }
+
     // Handle promotions and demotions
     handleContainerUpdates(application, updateRequests);
 
@@ -912,6 +947,7 @@ public class FairScheduler extends
 
     Resource headroom = application.getHeadroom();
     application.setApplicationHeadroomForMetrics(headroom);
+
     return new Allocation(newlyAllocatedContainers, headroom,
         preemptionContainerIds, null, null,
         application.pullUpdatedNMTokens(), null, null,
@@ -920,6 +956,34 @@ public class FairScheduler extends
         application.pullPreviousAttemptContainers());
   }
 
+  private List<MaxResourceValidationResult> validateResourceRequests(
+      List<ResourceRequest> requests, FSLeafQueue queue) {
+    List<MaxResourceValidationResult> validationResults = Lists.newArrayList();
+
+    for (ResourceRequest resourceRequest : requests) {
+      if (LOG.isTraceEnabled()) {
+        LOG.trace("Validating resource request: " + resourceRequest);
+      }
+
+      MaxResourceValidationResult validationResult =
+              SchedulerUtils.validateResourceRequestsAgainstQueueMaxResource(
+                      resourceRequest, queue.getMaxShare());
+      if (!validationResult.isValid()) {
+        validationResults.add(validationResult);
+        LOG.warn(String.format("Queue %s cannot handle resource request" +
+                        "because it has zero available amount of resource " +
+                        "for a requested resource type, " +
+                        "so the resource request is ignored!"
+                        + " Requested resources: %s, " +
+                        "maximum queue resources: %s",
+                queue.getName(), resourceRequest.getCapability(),
+                queue.getMaxShare()));
+      }
+    }
+
+    return validationResults;
+  }
+
   @Override
   protected void nodeUpdate(RMNode nm) {
     try {
@@ -1060,9 +1124,14 @@ public class FairScheduler extends
         Resource assignedResource = Resources.clone(Resources.none());
         Resource maxResourcesToAssign = Resources.multiply(
             node.getUnallocatedResource(), 0.5f);
+
         while (node.getReservedContainer() == null) {
           Resource assignment = queueMgr.getRootQueue().assignContainer(node);
+
           if (assignment.equals(Resources.none())) {
+            if (LOG.isDebugEnabled()) {
+              LOG.debug("No container is allocated on node " + node);
+            }
             break;
           }
 
@@ -1254,9 +1323,7 @@ public class FairScheduler extends
           String message = "Application " + applicationId
               + " submitted to a reservation which is not yet "
               + "currently active: " + resQName;
-          this.rmContext.getDispatcher().getEventHandler().handle(
-              new RMAppEvent(applicationId, RMAppEventType.APP_REJECTED,
-                  message));
+          rejectApplicationWithMessage(applicationId, message);
           return null;
         }
         if (!queue.getParent().getQueueName().equals(queueName)) {
@@ -1264,9 +1331,7 @@ public class FairScheduler extends
               "Application: " + applicationId + " submitted to a reservation "
                   + resQName + " which does not belong to the specified queue: "
                   + queueName;
-          this.rmContext.getDispatcher().getEventHandler().handle(
-              new RMAppEvent(applicationId, RMAppEventType.APP_REJECTED,
-                  message));
+          rejectApplicationWithMessage(applicationId, message);
           return null;
         }
         // use the reservation queue to run the app
@@ -1279,7 +1344,13 @@ public class FairScheduler extends
     } finally {
       readLock.unlock();
     }
+  }
 
+  private void rejectApplicationWithMessage(ApplicationId applicationId,
+          String msg) {
+    LOG.info(msg);
+    rmContext.getDispatcher().getEventHandler().handle(new RMAppEvent(
+            applicationId, RMAppEventType.APP_REJECTED, msg));
   }
 
   private String getDefaultQueueForPlanQueue(String queueName) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f48fec83/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java
index b998564..3ac3849 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
 
+import com.google.common.collect.Lists;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -57,6 +58,7 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 import java.io.File;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.List;
 
 public class FairSchedulerTestBase {
@@ -163,37 +165,43 @@ public class FairSchedulerTestBase {
   protected ApplicationAttemptId createSchedulingRequest(
       int memory, int vcores, String queueId, String userId, int numContainers,
       int priority) {
-    ApplicationAttemptId id = createAppAttemptId(this.APP_ID++,
-        this.ATTEMPT_ID++);
+    ResourceRequest request = createResourceRequest(memory, vcores,
+            ResourceRequest.ANY, priority, numContainers, true);
+    return createSchedulingRequest(Lists.newArrayList(request), queueId,
+            userId);
+  }
+
+  protected ApplicationAttemptId createSchedulingRequest(
+      Collection<ResourceRequest> requests, String queueId, String userId) {
+    ApplicationAttemptId id =
+        createAppAttemptId(this.APP_ID++, this.ATTEMPT_ID++);
     scheduler.addApplication(id.getApplicationId(), queueId, userId, false);
     // This conditional is for testAclSubmitApplication where app is rejected
     // and no app is added.
-    if (scheduler.getSchedulerApplications().
-        containsKey(id.getApplicationId())) {
+    if (scheduler.getSchedulerApplications()
+        .containsKey(id.getApplicationId())) {
       scheduler.addApplicationAttempt(id, false, false);
     }
-    List<ResourceRequest> ask = new ArrayList<ResourceRequest>();
-    ResourceRequest request = createResourceRequest(memory, vcores,
-        ResourceRequest.ANY, priority, numContainers, true);
-    ask.add(request);
+
+    List<ResourceRequest> ask = new ArrayList<>(requests);
 
     RMApp rmApp = mock(RMApp.class);
     RMAppAttempt rmAppAttempt = mock(RMAppAttempt.class);
     when(rmApp.getCurrentAppAttempt()).thenReturn(rmAppAttempt);
     when(rmAppAttempt.getRMAppAttemptMetrics()).thenReturn(
-        new RMAppAttemptMetrics(id, resourceManager.getRMContext()));
+            new RMAppAttemptMetrics(id, resourceManager.getRMContext()));
     ApplicationSubmissionContext submissionContext =
-        mock(ApplicationSubmissionContext.class);
+            mock(ApplicationSubmissionContext.class);
     when(submissionContext.getUnmanagedAM()).thenReturn(false);
     when(rmAppAttempt.getSubmissionContext()).thenReturn(submissionContext);
     when(rmApp.getApplicationSubmissionContext()).thenReturn(submissionContext);
     Container container = mock(Container.class);
     when(rmAppAttempt.getMasterContainer()).thenReturn(container);
     resourceManager.getRMContext().getRMApps()
-        .put(id.getApplicationId(), rmApp);
+            .put(id.getApplicationId(), rmApp);
 
-    scheduler.allocate(id, ask, null, new ArrayList<ContainerId>(),
-        null, null, NULL_UPDATE_REQUESTS);
+    scheduler.allocate(id, ask, null, new ArrayList<>(),
+            null, null, NULL_UPDATE_REQUESTS);
     scheduler.update();
     return id;
   }
@@ -252,13 +260,36 @@ public class FairSchedulerTestBase {
 
   protected void createApplicationWithAMResource(ApplicationAttemptId attId,
       String queue, String user, Resource amResource) {
+    createApplicationWithAMResourceInternal(attId, queue, user, amResource,
+        null);
+    ApplicationId appId = attId.getApplicationId();
+    addApplication(queue, user, appId);
+    addAppAttempt(attId);
+  }
+
+  protected void createApplicationWithAMResource(ApplicationAttemptId attId,
+      String queue, String user, Resource amResource,
+      List<ResourceRequest> amReqs) {
+    createApplicationWithAMResourceInternal(attId, queue, user, amResource,
+        amReqs);
+    ApplicationId appId = attId.getApplicationId();
+    addApplication(queue, user, appId);
+  }
+
+  private void createApplicationWithAMResourceInternal(
+      ApplicationAttemptId attId, String queue, String user,
+      Resource amResource, List<ResourceRequest> amReqs) {
     RMContext rmContext = resourceManager.getRMContext();
     ApplicationId appId = attId.getApplicationId();
     RMApp rmApp = new RMAppImpl(appId, rmContext, conf, null, user, null,
         ApplicationSubmissionContext.newInstance(appId, null, queue, null,
             mock(ContainerLaunchContext.class), false, false, 0, amResource,
-            null), scheduler, null, 0, null, null, null);
+            null),
+        scheduler, null, 0, null, null, amReqs);
     rmContext.getRMApps().put(appId, rmApp);
+  }
+
+  private void addApplication(String queue, String user, ApplicationId appId) {
     RMAppEvent event = new RMAppEvent(appId, RMAppEventType.START);
     resourceManager.getRMContext().getRMApps().get(appId).handle(event);
     event = new RMAppEvent(appId, RMAppEventType.APP_NEW_SAVED);
@@ -268,8 +299,11 @@ public class FairSchedulerTestBase {
     AppAddedSchedulerEvent appAddedEvent = new AppAddedSchedulerEvent(
         appId, queue, user);
     scheduler.handle(appAddedEvent);
+  }
+
+  private void addAppAttempt(ApplicationAttemptId attId) {
     AppAttemptAddedSchedulerEvent attempAddedEvent =
-        new AppAttemptAddedSchedulerEvent(attId, false);
+            new AppAttemptAddedSchedulerEvent(attId, false);
     scheduler.handle(attempAddedEvent);
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f48fec83/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
index d9c06a7..2f6c2cf 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
@@ -41,9 +41,11 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.stream.Collectors;
 
 import javax.xml.parsers.ParserConfigurationException;
 
+import com.google.common.collect.Lists;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.ha.HAServiceProtocol;
@@ -62,6 +64,7 @@ import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.NodeState;
 import org.apache.hadoop.yarn.api.records.QueueInfo;
 import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceInformation;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
@@ -69,6 +72,8 @@ import org.apache.hadoop.yarn.event.AsyncDispatcher;
 import org.apache.hadoop.yarn.event.Dispatcher;
 import org.apache.hadoop.yarn.event.Event;
 import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.exceptions
+        .SchedulerInvalidResoureRequestException;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.security.YarnAuthorizationProvider;
@@ -5414,4 +5419,204 @@ public class TestFairScheduler extends FairSchedulerTestBase {
             SchedulerUtils.COMPLETED_APPLICATION),
         RMContainerEventType.EXPIRE);
   }
+
+  @Test
+  public void testAppRejectedToQueueWithZeroCapacityOfVcores()
+      throws IOException {
+    testAppRejectedToQueueWithZeroCapacityOfResource(
+            ResourceInformation.VCORES_URI);
+  }
+
+  @Test
+  public void testAppRejectedToQueueWithZeroCapacityOfMemory()
+      throws IOException {
+    testAppRejectedToQueueWithZeroCapacityOfResource(
+            ResourceInformation.MEMORY_URI);
+  }
+
+  private void testAppRejectedToQueueWithZeroCapacityOfResource(String resource)
+      throws IOException {
+    conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
+    generateAllocationFileWithZeroResource(resource);
+
+    final List<Event> recordedEvents = Lists.newArrayList();
+
+    RMContext spyContext = Mockito.spy(resourceManager.getRMContext());
+    Dispatcher mockDispatcher = mock(AsyncDispatcher.class);
+    when(mockDispatcher.getEventHandler()).thenReturn((EventHandler) event -> {
+      if (event instanceof RMAppEvent) {
+        recordedEvents.add(event);
+      }
+    });
+    Mockito.doReturn(mockDispatcher).when(spyContext).getDispatcher();
+    ((AsyncDispatcher) mockDispatcher).start();
+
+    scheduler.setRMContext(spyContext);
+
+    scheduler.init(conf);
+    scheduler.start();
+    scheduler.reinitialize(conf, resourceManager.getRMContext());
+
+    // submit app with queue name (queueA)
+    ApplicationAttemptId appAttemptId1 = createAppAttemptId(1, 1);
+
+    ResourceRequest amReqs = ResourceRequest.newBuilder()
+        .capability(Resource.newInstance(5 * GB, 3)).build();
+    createApplicationWithAMResource(appAttemptId1, "queueA", "user1",
+        Resource.newInstance(GB, 1), Lists.newArrayList(amReqs));
+    scheduler.update();
+
+    assertEquals("Exactly one APP_REJECTED event is expected", 1,
+        recordedEvents.size());
+    Event event = recordedEvents.get(0);
+    RMAppEvent rmAppEvent = (RMAppEvent) event;
+    assertEquals(RMAppEventType.APP_REJECTED, rmAppEvent.getType());
+    assertTrue("Diagnostic message does not match: " +
+                    rmAppEvent.getDiagnosticMsg(),
+            rmAppEvent.getDiagnosticMsg()
+        .matches("Cannot submit application application[\\d_]+ to queue "
+            + "root.queueA because it has zero amount of resource "
+            + "for a requested resource! " +
+                "Invalid requested AM resources: .+, "
+            + "maximum queue resources: .+"));
+  }
+
+  private void generateAllocationFileWithZeroResource(String resource)
+      throws IOException {
+    PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
+    out.println("<?xml version=\"1.0\"?>");
+    out.println("<allocations>");
+    out.println("<queue name=\"queueA\">");
+
+    String resources = "";
+    if (resource.equals(ResourceInformation.MEMORY_URI)) {
+      resources = "0 mb,2vcores";
+    } else if (resource.equals(ResourceInformation.VCORES_URI)) {
+      resources = "10000 mb,0vcores";
+    }
+    out.println("<minResources>" + resources + "</minResources>");
+    out.println("<maxResources>" + resources + "</maxResources>");
+    out.println("<weight>2.0</weight>");
+    out.println("</queue>");
+    out.println("<queue name=\"queueB\">");
+    out.println("<minResources>1 mb 1 vcores</minResources>");
+    out.println("<weight>0.0</weight>");
+    out.println("</queue>");
+    out.println("</allocations>");
+    out.close();
+  }
+
+  @Test
+  public void testSchedulingRejectedToQueueWithZeroCapacityOfMemory()
+      throws IOException {
+    // This request is not valid as queue will have 0 capacity of memory and
+    // the requests asks 2048M
+    ResourceRequest invalidRequest =
+        createResourceRequest(2048, 2, ResourceRequest.ANY, 1, 2, true);
+
+    ResourceRequest validRequest =
+        createResourceRequest(0, 0, ResourceRequest.ANY, 1, 2, true);
+    testSchedulingRejectedToQueueZeroCapacityOfResource(
+        ResourceInformation.MEMORY_URI,
+        Lists.newArrayList(invalidRequest, validRequest));
+  }
+
+  @Test
+  public void testSchedulingAllowedToQueueWithZeroCapacityOfMemory()
+      throws IOException {
+    testSchedulingAllowedToQueueZeroCapacityOfResource(
+        ResourceInformation.MEMORY_URI, 0, 2);
+  }
+
+  @Test
+  public void testSchedulingRejectedToQueueWithZeroCapacityOfVcores()
+      throws IOException {
+    // This request is not valid as queue will have 0 capacity of vCores and
+    // the requests asks 1
+    ResourceRequest invalidRequest =
+        createResourceRequest(0, 1, ResourceRequest.ANY, 1, 2, true);
+
+    ResourceRequest validRequest =
+        createResourceRequest(0, 0, ResourceRequest.ANY, 1, 2, true);
+
+    testSchedulingRejectedToQueueZeroCapacityOfResource(
+        ResourceInformation.VCORES_URI,
+        Lists.newArrayList(invalidRequest, validRequest));
+  }
+
+  @Test
+  public void testSchedulingAllowedToQueueWithZeroCapacityOfVcores()
+      throws IOException {
+    testSchedulingAllowedToQueueZeroCapacityOfResource(
+            ResourceInformation.VCORES_URI, 2048, 0);
+  }
+
+  private void testSchedulingRejectedToQueueZeroCapacityOfResource(
+      String resource, Collection<ResourceRequest> requests)
+      throws IOException {
+    conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
+    generateAllocationFileWithZeroResource(resource);
+
+    scheduler.init(conf);
+    scheduler.start();
+    scheduler.reinitialize(conf, resourceManager.getRMContext());
+
+    // Add a node
+    RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(2048, 2));
+    NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
+    scheduler.handle(nodeEvent1);
+
+    try {
+      createSchedulingRequest(requests, "queueA", "user1");
+      fail("Exception is expected because the queue has zero capacity of "
+          + resource + " and requested resource capabilities are: "
+          + requests.stream().map(ResourceRequest::getCapability)
+              .collect(Collectors.toList()));
+    } catch (SchedulerInvalidResoureRequestException e) {
+      assertTrue(
+          "The thrown exception is not the expected one. Exception message: "
+              + e.getMessage(),
+          e.getMessage()
+              .matches("Resource request is invalid for application "
+                  + "application[\\d_]+ because queue root\\.queueA has 0 "
+                  + "amount of resource for a resource type! "
+                  + "Validation result:.*"));
+
+      List<ApplicationAttemptId> appsInQueue =
+          scheduler.getAppsInQueue("queueA");
+      assertEquals("Number of apps in queue 'queueA' should be one!", 1,
+          appsInQueue.size());
+
+      ApplicationAttemptId appAttemptId =
+          scheduler.getAppsInQueue("queueA").get(0);
+      assertNotNull(
+          "Scheduler app for appAttemptId " + appAttemptId
+              + " should not be null!",
+          scheduler.getSchedulerApp(appAttemptId));
+
+      FSAppAttempt schedulerApp = scheduler.getSchedulerApp(appAttemptId);
+      assertNotNull("Scheduler app queueInfo for appAttemptId " + appAttemptId
+          + " should not be null!", schedulerApp.getAppSchedulingInfo());
+
+      assertTrue("There should be no requests accepted", schedulerApp
+          .getAppSchedulingInfo().getAllResourceRequests().isEmpty());
+    }
+  }
+
+  private void testSchedulingAllowedToQueueZeroCapacityOfResource(
+          String resource, int memory, int vCores) throws IOException {
+    conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
+    generateAllocationFileWithZeroResource(resource);
+
+    scheduler.init(conf);
+    scheduler.start();
+    scheduler.reinitialize(conf, resourceManager.getRMContext());
+
+    // Add a node
+    RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(2048, 2));
+    NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
+    scheduler.handle(nodeEvent1);
+
+    createSchedulingRequest(memory, vCores, "queueA", "user1", 1, 2);
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org