You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ar...@apache.org on 2018/05/22 20:15:00 UTC
[32/50] [abbrv] hadoop git commit: YARN-8248. Job hangs when a job
requests a resource that its queue does not have. (Szilard Nemeth via Haibo
Chen)
YARN-8248. Job hangs when a job requests a resource that its queue does not have. (Szilard Nemeth via Haibo Chen)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/f48fec83
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/f48fec83
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/f48fec83
Branch: refs/heads/HDDS-48
Commit: f48fec83d0f2d1a781a141ad7216463c5526321f
Parents: 3d2d9db
Author: Haibo Chen <ha...@apache.org>
Authored: Mon May 21 08:00:21 2018 -0700
Committer: Haibo Chen <ha...@apache.org>
Committed: Mon May 21 08:10:41 2018 -0700
----------------------------------------------------------------------
.../scheduler/SchedulerUtils.java | 159 ++++++++++----
.../scheduler/fair/FSAppAttempt.java | 10 +-
.../scheduler/fair/FSParentQueue.java | 3 +
.../scheduler/fair/FairScheduler.java | 115 +++++++++--
.../scheduler/fair/FairSchedulerTestBase.java | 64 ++++--
.../scheduler/fair/TestFairScheduler.java | 205 +++++++++++++++++++
6 files changed, 481 insertions(+), 75 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f48fec83/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java
index 9b3c20a..7de250d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java
@@ -18,9 +18,14 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
import java.util.Set;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -40,6 +45,8 @@ import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.InvalidLabelResourceRequestException;
import org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException;
+import org.apache.hadoop.yarn.exceptions
+ .SchedulerInvalidResoureRequestException;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.security.AccessType;
@@ -61,12 +68,37 @@ import org.apache.hadoop.yarn.util.resource.Resources;
@Unstable
public class SchedulerUtils {
+ /**
+ * This class contains invalid resource information along with its
+ * resource request.
+ */
+ public static class MaxResourceValidationResult {
+ private ResourceRequest resourceRequest;
+ private List<ResourceInformation> invalidResources;
+
+ MaxResourceValidationResult(ResourceRequest resourceRequest,
+ List<ResourceInformation> invalidResources) {
+ this.resourceRequest = resourceRequest;
+ this.invalidResources = invalidResources;
+ }
+
+ public boolean isValid() {
+ return invalidResources.isEmpty();
+ }
+
+ @Override
+ public String toString() {
+ return "MaxResourceValidationResult{" + "resourceRequest="
+ + resourceRequest + ", invalidResources=" + invalidResources + '}';
+ }
+ }
+
private static final Log LOG = LogFactory.getLog(SchedulerUtils.class);
- private static final RecordFactory recordFactory =
+ private static final RecordFactory recordFactory =
RecordFactoryProvider.getRecordFactory(null);
- public static final String RELEASED_CONTAINER =
+ public static final String RELEASED_CONTAINER =
"Container released by application";
public static final String UPDATED_CONTAINER =
@@ -325,6 +357,22 @@ public class SchedulerUtils {
}
}
+ private static Map<String, ResourceInformation> getZeroResources(
+ Resource resource) {
+ Map<String, ResourceInformation> resourceInformations = Maps.newHashMap();
+ int maxLength = ResourceUtils.getNumberOfKnownResourceTypes();
+
+ for (int i = 0; i < maxLength; i++) {
+ ResourceInformation resourceInformation =
+ resource.getResourceInformation(i);
+ if (resourceInformation.getValue() == 0L) {
+ resourceInformations.put(resourceInformation.getName(),
+ resourceInformation);
+ }
+ }
+ return resourceInformations;
+ }
+
@Private
@VisibleForTesting
static void checkResourceRequestAgainstAvailableResource(Resource reqResource,
@@ -339,47 +387,86 @@ public class SchedulerUtils {
reqResourceName);
}
- final ResourceInformation availableRI =
- availableResource.getResourceInformation(reqResourceName);
+ boolean valid = checkResource(requestedRI, availableResource);
+ if (!valid) {
+ throwInvalidResourceException(reqResource, availableResource,
+ reqResourceName);
+ }
+ }
+ }
- long requestedResourceValue = requestedRI.getValue();
- long availableResourceValue = availableRI.getValue();
- int unitsRelation = UnitsConversionUtil
- .compareUnits(requestedRI.getUnits(), availableRI.getUnits());
+ public static MaxResourceValidationResult
+ validateResourceRequestsAgainstQueueMaxResource(
+ ResourceRequest resReq, Resource availableResource)
+ throws SchedulerInvalidResoureRequestException {
+ final Resource reqResource = resReq.getCapability();
+ Map<String, ResourceInformation> resourcesWithZeroAmount =
+ getZeroResources(availableResource);
+
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Resources with zero amount: "
+ + Arrays.toString(resourcesWithZeroAmount.entrySet().toArray()));
+ }
- if (LOG.isDebugEnabled()) {
- LOG.debug("Requested resource information: " + requestedRI);
- LOG.debug("Available resource information: " + availableRI);
- LOG.debug("Relation of units: " + unitsRelation);
- }
+ List<ResourceInformation> invalidResources = Lists.newArrayList();
+ for (int i = 0; i < ResourceUtils.getNumberOfKnownResourceTypes(); i++) {
+ final ResourceInformation requestedRI =
+ reqResource.getResourceInformation(i);
+ final String reqResourceName = requestedRI.getName();
- // requested resource unit is less than available resource unit
- // e.g. requestedUnit: "m", availableUnit: "K")
- if (unitsRelation < 0) {
- availableResourceValue =
- UnitsConversionUtil.convert(availableRI.getUnits(),
- requestedRI.getUnits(), availableRI.getValue());
-
- // requested resource unit is greater than available resource unit
- // e.g. requestedUnit: "G", availableUnit: "M")
- } else if (unitsRelation > 0) {
- requestedResourceValue =
- UnitsConversionUtil.convert(requestedRI.getUnits(),
- availableRI.getUnits(), requestedRI.getValue());
+ if (resourcesWithZeroAmount.containsKey(reqResourceName)
+ && requestedRI.getValue() > 0) {
+ invalidResources.add(requestedRI);
}
+ }
+ return new MaxResourceValidationResult(resReq, invalidResources);
+ }
- if (LOG.isDebugEnabled()) {
- LOG.debug("Requested resource value after conversion: " +
- requestedResourceValue);
- LOG.info("Available resource value after conversion: " +
- availableResourceValue);
- }
+ /**
+ * Checks requested ResouceInformation against available Resource.
+ * @param requestedRI
+ * @param availableResource
+ * @return true if request is valid, false otherwise.
+ */
+ private static boolean checkResource(
+ ResourceInformation requestedRI, Resource availableResource) {
+ final ResourceInformation availableRI =
+ availableResource.getResourceInformation(requestedRI.getName());
- if (requestedResourceValue > availableResourceValue) {
- throwInvalidResourceException(reqResource, availableResource,
- reqResourceName);
- }
+ long requestedResourceValue = requestedRI.getValue();
+ long availableResourceValue = availableRI.getValue();
+ int unitsRelation = UnitsConversionUtil.compareUnits(requestedRI.getUnits(),
+ availableRI.getUnits());
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Requested resource information: " + requestedRI);
+ LOG.debug("Available resource information: " + availableRI);
+ LOG.debug("Relation of units: " + unitsRelation);
+ }
+
+ // requested resource unit is less than available resource unit
+ // e.g. requestedUnit: "m", availableUnit: "K")
+ if (unitsRelation < 0) {
+ availableResourceValue =
+ UnitsConversionUtil.convert(availableRI.getUnits(),
+ requestedRI.getUnits(), availableRI.getValue());
+
+ // requested resource unit is greater than available resource unit
+ // e.g. requestedUnit: "G", availableUnit: "M")
+ } else if (unitsRelation > 0) {
+ requestedResourceValue =
+ UnitsConversionUtil.convert(requestedRI.getUnits(),
+ availableRI.getUnits(), requestedRI.getValue());
}
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Requested resource value after conversion: "
+ + requestedResourceValue);
+ LOG.info("Available resource value after conversion: "
+ + availableResourceValue);
+ }
+
+ return requestedResourceValue <= availableResourceValue;
}
private static void throwInvalidResourceException(Resource reqResource,
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f48fec83/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java
index 0305702..281aded 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java
@@ -459,7 +459,6 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
// Add it to allContainers list.
addToNewlyAllocatedContainers(node, rmContainer);
liveContainers.put(container.getId(), rmContainer);
-
// Update consumption and track allocations
ContainerRequest containerRequest = appSchedulingInfo.allocate(
type, node, schedulerKey, container);
@@ -867,6 +866,12 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
if (reserved) {
unreserve(schedulerKey, node);
}
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(String.format(
+ "Resource ask %s fits in available node resources %s, " +
+ "but no container was allocated",
+ capability, available));
+ }
return Resources.none();
}
@@ -1096,7 +1101,8 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
} else if (!getQueue().fitsInMaxShare(resource)) {
// The requested container must fit in queue maximum share
updateAMDiagnosticMsg(resource,
- " exceeds current queue or its parents maximum resource allowed).");
+ " exceeds current queue or its parents maximum resource allowed). " +
+ "Max share of queue: " + getQueue().getMaxShare());
ret = false;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f48fec83/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java
index a8e53fc..26c5630 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java
@@ -182,6 +182,9 @@ public class FSParentQueue extends FSQueue {
// If this queue is over its limit, reject
if (!assignContainerPreCheck(node)) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Assign container precheck on node " + node + " failed");
+ }
return assigned;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f48fec83/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
index 1f85814..1c4bd51 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
@@ -42,6 +43,8 @@ import org.apache.hadoop.yarn.api.records.ResourceOption;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.SchedulingRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions
+ .SchedulerInvalidResoureRequestException;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.SchedulerResourceTypes;
@@ -73,6 +76,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerUpdates;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils.MaxResourceValidationResult;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.QueueEntitlement;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent;
@@ -449,10 +453,7 @@ public class FairScheduler extends
String message =
"Reject application " + applicationId + " submitted by user " + user
+ " with an empty queue name.";
- LOG.info(message);
- rmContext.getDispatcher().getEventHandler().handle(
- new RMAppEvent(applicationId, RMAppEventType.APP_REJECTED,
- message));
+ rejectApplicationWithMessage(applicationId, message);
return;
}
@@ -461,10 +462,7 @@ public class FairScheduler extends
"Reject application " + applicationId + " submitted by user " + user
+ " with an illegal queue name " + queueName + ". "
+ "The queue name cannot start/end with period.";
- LOG.info(message);
- rmContext.getDispatcher().getEventHandler().handle(
- new RMAppEvent(applicationId, RMAppEventType.APP_REJECTED,
- message));
+ rejectApplicationWithMessage(applicationId, message);
return;
}
@@ -476,6 +474,31 @@ public class FairScheduler extends
return;
}
+ if (rmApp != null && rmApp.getAMResourceRequests() != null) {
+ // Resources.fitsIn would always return false when queueMaxShare is 0
+ // for any resource, but only using Resources.fitsIn is not enough
+ // is it would return false for such cases when the requested
+ // resource is smaller than the max resource but that max resource is
+ // not zero, e.g. requested vCores = 2, max vCores = 1.
+ // With this check, we only reject those applications where resource
+ // requested is greater than 0 and we have 0
+ // of that resource on the queue.
+ List<MaxResourceValidationResult> invalidAMResourceRequests =
+ validateResourceRequests(rmApp.getAMResourceRequests(), queue);
+
+ if (!invalidAMResourceRequests.isEmpty()) {
+ String msg = String.format(
+ "Cannot submit application %s to queue %s because "
+ + "it has zero amount of resource for a requested "
+ + "resource! Invalid requested AM resources: %s, "
+ + "maximum queue resources: %s",
+ applicationId, queue.getName(),
+ invalidAMResourceRequests, queue.getMaxShare());
+ rejectApplicationWithMessage(applicationId, msg);
+ return;
+ }
+ }
+
// Enforce ACLs
UserGroupInformation userUgi = UserGroupInformation.createRemoteUser(
user);
@@ -485,9 +508,7 @@ public class FairScheduler extends
String msg = "User " + userUgi.getUserName()
+ " cannot submit applications to queue " + queue.getName()
+ "(requested queuename is " + queueName + ")";
- LOG.info(msg);
- rmContext.getDispatcher().getEventHandler().handle(
- new RMAppEvent(applicationId, RMAppEventType.APP_REJECTED, msg));
+ rejectApplicationWithMessage(applicationId, msg);
return;
}
@@ -604,10 +625,7 @@ public class FairScheduler extends
}
if (appRejectMsg != null && rmApp != null) {
- LOG.error(appRejectMsg);
- rmContext.getDispatcher().getEventHandler().handle(
- new RMAppEvent(rmApp.getApplicationId(),
- RMAppEventType.APP_REJECTED, appRejectMsg));
+ rejectApplicationWithMessage(rmApp.getApplicationId(), appRejectMsg);
return null;
}
@@ -834,7 +852,6 @@ public class FairScheduler extends
List<ResourceRequest> ask, List<SchedulingRequest> schedulingRequests,
List<ContainerId> release, List<String> blacklistAdditions,
List<String> blacklistRemovals, ContainerUpdates updateRequests) {
-
// Make sure this application exists
FSAppAttempt application = getSchedulerApp(appAttemptId);
if (application == null) {
@@ -854,6 +871,24 @@ public class FairScheduler extends
return EMPTY_ALLOCATION;
}
+ ApplicationId applicationId = application.getApplicationId();
+ FSLeafQueue queue = application.getQueue();
+ List<MaxResourceValidationResult> invalidAsks =
+ validateResourceRequests(ask, queue);
+
+ // We need to be fail-fast here if any invalid ask is detected.
+ // If we would have thrown exception later, this could be problematic as
+ // tokens and promoted / demoted containers would have been lost because
+ // scheduler would clear them right away and AM
+ // would not get this information.
+ if (!invalidAsks.isEmpty()) {
+ throw new SchedulerInvalidResoureRequestException(String.format(
+ "Resource request is invalid for application %s because queue %s "
+ + "has 0 amount of resource for a resource type! "
+ + "Validation result: %s",
+ applicationId, queue.getName(), invalidAsks));
+ }
+
// Handle promotions and demotions
handleContainerUpdates(application, updateRequests);
@@ -912,6 +947,7 @@ public class FairScheduler extends
Resource headroom = application.getHeadroom();
application.setApplicationHeadroomForMetrics(headroom);
+
return new Allocation(newlyAllocatedContainers, headroom,
preemptionContainerIds, null, null,
application.pullUpdatedNMTokens(), null, null,
@@ -920,6 +956,34 @@ public class FairScheduler extends
application.pullPreviousAttemptContainers());
}
+ private List<MaxResourceValidationResult> validateResourceRequests(
+ List<ResourceRequest> requests, FSLeafQueue queue) {
+ List<MaxResourceValidationResult> validationResults = Lists.newArrayList();
+
+ for (ResourceRequest resourceRequest : requests) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Validating resource request: " + resourceRequest);
+ }
+
+ MaxResourceValidationResult validationResult =
+ SchedulerUtils.validateResourceRequestsAgainstQueueMaxResource(
+ resourceRequest, queue.getMaxShare());
+ if (!validationResult.isValid()) {
+ validationResults.add(validationResult);
+ LOG.warn(String.format("Queue %s cannot handle resource request" +
+ "because it has zero available amount of resource " +
+ "for a requested resource type, " +
+ "so the resource request is ignored!"
+ + " Requested resources: %s, " +
+ "maximum queue resources: %s",
+ queue.getName(), resourceRequest.getCapability(),
+ queue.getMaxShare()));
+ }
+ }
+
+ return validationResults;
+ }
+
@Override
protected void nodeUpdate(RMNode nm) {
try {
@@ -1060,9 +1124,14 @@ public class FairScheduler extends
Resource assignedResource = Resources.clone(Resources.none());
Resource maxResourcesToAssign = Resources.multiply(
node.getUnallocatedResource(), 0.5f);
+
while (node.getReservedContainer() == null) {
Resource assignment = queueMgr.getRootQueue().assignContainer(node);
+
if (assignment.equals(Resources.none())) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("No container is allocated on node " + node);
+ }
break;
}
@@ -1254,9 +1323,7 @@ public class FairScheduler extends
String message = "Application " + applicationId
+ " submitted to a reservation which is not yet "
+ "currently active: " + resQName;
- this.rmContext.getDispatcher().getEventHandler().handle(
- new RMAppEvent(applicationId, RMAppEventType.APP_REJECTED,
- message));
+ rejectApplicationWithMessage(applicationId, message);
return null;
}
if (!queue.getParent().getQueueName().equals(queueName)) {
@@ -1264,9 +1331,7 @@ public class FairScheduler extends
"Application: " + applicationId + " submitted to a reservation "
+ resQName + " which does not belong to the specified queue: "
+ queueName;
- this.rmContext.getDispatcher().getEventHandler().handle(
- new RMAppEvent(applicationId, RMAppEventType.APP_REJECTED,
- message));
+ rejectApplicationWithMessage(applicationId, message);
return null;
}
// use the reservation queue to run the app
@@ -1279,7 +1344,13 @@ public class FairScheduler extends
} finally {
readLock.unlock();
}
+ }
+ private void rejectApplicationWithMessage(ApplicationId applicationId,
+ String msg) {
+ LOG.info(msg);
+ rmContext.getDispatcher().getEventHandler().handle(new RMAppEvent(
+ applicationId, RMAppEventType.APP_REJECTED, msg));
}
private String getDefaultQueueForPlanQueue(String queueName) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f48fec83/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java
index b998564..3ac3849 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java
@@ -17,6 +17,7 @@
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
+import com.google.common.collect.Lists;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -57,6 +58,7 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.io.File;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.List;
public class FairSchedulerTestBase {
@@ -163,37 +165,43 @@ public class FairSchedulerTestBase {
protected ApplicationAttemptId createSchedulingRequest(
int memory, int vcores, String queueId, String userId, int numContainers,
int priority) {
- ApplicationAttemptId id = createAppAttemptId(this.APP_ID++,
- this.ATTEMPT_ID++);
+ ResourceRequest request = createResourceRequest(memory, vcores,
+ ResourceRequest.ANY, priority, numContainers, true);
+ return createSchedulingRequest(Lists.newArrayList(request), queueId,
+ userId);
+ }
+
+ protected ApplicationAttemptId createSchedulingRequest(
+ Collection<ResourceRequest> requests, String queueId, String userId) {
+ ApplicationAttemptId id =
+ createAppAttemptId(this.APP_ID++, this.ATTEMPT_ID++);
scheduler.addApplication(id.getApplicationId(), queueId, userId, false);
// This conditional is for testAclSubmitApplication where app is rejected
// and no app is added.
- if (scheduler.getSchedulerApplications().
- containsKey(id.getApplicationId())) {
+ if (scheduler.getSchedulerApplications()
+ .containsKey(id.getApplicationId())) {
scheduler.addApplicationAttempt(id, false, false);
}
- List<ResourceRequest> ask = new ArrayList<ResourceRequest>();
- ResourceRequest request = createResourceRequest(memory, vcores,
- ResourceRequest.ANY, priority, numContainers, true);
- ask.add(request);
+
+ List<ResourceRequest> ask = new ArrayList<>(requests);
RMApp rmApp = mock(RMApp.class);
RMAppAttempt rmAppAttempt = mock(RMAppAttempt.class);
when(rmApp.getCurrentAppAttempt()).thenReturn(rmAppAttempt);
when(rmAppAttempt.getRMAppAttemptMetrics()).thenReturn(
- new RMAppAttemptMetrics(id, resourceManager.getRMContext()));
+ new RMAppAttemptMetrics(id, resourceManager.getRMContext()));
ApplicationSubmissionContext submissionContext =
- mock(ApplicationSubmissionContext.class);
+ mock(ApplicationSubmissionContext.class);
when(submissionContext.getUnmanagedAM()).thenReturn(false);
when(rmAppAttempt.getSubmissionContext()).thenReturn(submissionContext);
when(rmApp.getApplicationSubmissionContext()).thenReturn(submissionContext);
Container container = mock(Container.class);
when(rmAppAttempt.getMasterContainer()).thenReturn(container);
resourceManager.getRMContext().getRMApps()
- .put(id.getApplicationId(), rmApp);
+ .put(id.getApplicationId(), rmApp);
- scheduler.allocate(id, ask, null, new ArrayList<ContainerId>(),
- null, null, NULL_UPDATE_REQUESTS);
+ scheduler.allocate(id, ask, null, new ArrayList<>(),
+ null, null, NULL_UPDATE_REQUESTS);
scheduler.update();
return id;
}
@@ -252,13 +260,36 @@ public class FairSchedulerTestBase {
protected void createApplicationWithAMResource(ApplicationAttemptId attId,
String queue, String user, Resource amResource) {
+ createApplicationWithAMResourceInternal(attId, queue, user, amResource,
+ null);
+ ApplicationId appId = attId.getApplicationId();
+ addApplication(queue, user, appId);
+ addAppAttempt(attId);
+ }
+
+ protected void createApplicationWithAMResource(ApplicationAttemptId attId,
+ String queue, String user, Resource amResource,
+ List<ResourceRequest> amReqs) {
+ createApplicationWithAMResourceInternal(attId, queue, user, amResource,
+ amReqs);
+ ApplicationId appId = attId.getApplicationId();
+ addApplication(queue, user, appId);
+ }
+
+ private void createApplicationWithAMResourceInternal(
+ ApplicationAttemptId attId, String queue, String user,
+ Resource amResource, List<ResourceRequest> amReqs) {
RMContext rmContext = resourceManager.getRMContext();
ApplicationId appId = attId.getApplicationId();
RMApp rmApp = new RMAppImpl(appId, rmContext, conf, null, user, null,
ApplicationSubmissionContext.newInstance(appId, null, queue, null,
mock(ContainerLaunchContext.class), false, false, 0, amResource,
- null), scheduler, null, 0, null, null, null);
+ null),
+ scheduler, null, 0, null, null, amReqs);
rmContext.getRMApps().put(appId, rmApp);
+ }
+
+ private void addApplication(String queue, String user, ApplicationId appId) {
RMAppEvent event = new RMAppEvent(appId, RMAppEventType.START);
resourceManager.getRMContext().getRMApps().get(appId).handle(event);
event = new RMAppEvent(appId, RMAppEventType.APP_NEW_SAVED);
@@ -268,8 +299,11 @@ public class FairSchedulerTestBase {
AppAddedSchedulerEvent appAddedEvent = new AppAddedSchedulerEvent(
appId, queue, user);
scheduler.handle(appAddedEvent);
+ }
+
+ private void addAppAttempt(ApplicationAttemptId attId) {
AppAttemptAddedSchedulerEvent attempAddedEvent =
- new AppAttemptAddedSchedulerEvent(attId, false);
+ new AppAttemptAddedSchedulerEvent(attId, false);
scheduler.handle(attempAddedEvent);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f48fec83/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
index d9c06a7..2f6c2cf 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
@@ -41,9 +41,11 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.stream.Collectors;
import javax.xml.parsers.ParserConfigurationException;
+import com.google.common.collect.Lists;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.ha.HAServiceProtocol;
@@ -62,6 +64,7 @@ import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.QueueInfo;
import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceInformation;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
@@ -69,6 +72,8 @@ import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.Event;
import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.exceptions
+ .SchedulerInvalidResoureRequestException;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.security.YarnAuthorizationProvider;
@@ -5414,4 +5419,204 @@ public class TestFairScheduler extends FairSchedulerTestBase {
SchedulerUtils.COMPLETED_APPLICATION),
RMContainerEventType.EXPIRE);
}
+
+ @Test
+ public void testAppRejectedToQueueWithZeroCapacityOfVcores()
+ throws IOException {
+ testAppRejectedToQueueWithZeroCapacityOfResource(
+ ResourceInformation.VCORES_URI);
+ }
+
+ @Test
+ public void testAppRejectedToQueueWithZeroCapacityOfMemory()
+ throws IOException {
+ testAppRejectedToQueueWithZeroCapacityOfResource(
+ ResourceInformation.MEMORY_URI);
+ }
+
+ private void testAppRejectedToQueueWithZeroCapacityOfResource(String resource)
+ throws IOException {
+ conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
+ generateAllocationFileWithZeroResource(resource);
+
+ final List<Event> recordedEvents = Lists.newArrayList();
+
+ RMContext spyContext = Mockito.spy(resourceManager.getRMContext());
+ Dispatcher mockDispatcher = mock(AsyncDispatcher.class);
+ when(mockDispatcher.getEventHandler()).thenReturn((EventHandler) event -> {
+ if (event instanceof RMAppEvent) {
+ recordedEvents.add(event);
+ }
+ });
+ Mockito.doReturn(mockDispatcher).when(spyContext).getDispatcher();
+ ((AsyncDispatcher) mockDispatcher).start();
+
+ scheduler.setRMContext(spyContext);
+
+ scheduler.init(conf);
+ scheduler.start();
+ scheduler.reinitialize(conf, resourceManager.getRMContext());
+
+ // submit app with queue name (queueA)
+ ApplicationAttemptId appAttemptId1 = createAppAttemptId(1, 1);
+
+ ResourceRequest amReqs = ResourceRequest.newBuilder()
+ .capability(Resource.newInstance(5 * GB, 3)).build();
+ createApplicationWithAMResource(appAttemptId1, "queueA", "user1",
+ Resource.newInstance(GB, 1), Lists.newArrayList(amReqs));
+ scheduler.update();
+
+ assertEquals("Exactly one APP_REJECTED event is expected", 1,
+ recordedEvents.size());
+ Event event = recordedEvents.get(0);
+ RMAppEvent rmAppEvent = (RMAppEvent) event;
+ assertEquals(RMAppEventType.APP_REJECTED, rmAppEvent.getType());
+ assertTrue("Diagnostic message does not match: " +
+ rmAppEvent.getDiagnosticMsg(),
+ rmAppEvent.getDiagnosticMsg()
+ .matches("Cannot submit application application[\\d_]+ to queue "
+ + "root.queueA because it has zero amount of resource "
+ + "for a requested resource! " +
+ "Invalid requested AM resources: .+, "
+ + "maximum queue resources: .+"));
+ }
+
+ private void generateAllocationFileWithZeroResource(String resource)
+ throws IOException {
+ PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
+ out.println("<?xml version=\"1.0\"?>");
+ out.println("<allocations>");
+ out.println("<queue name=\"queueA\">");
+
+ String resources = "";
+ if (resource.equals(ResourceInformation.MEMORY_URI)) {
+ resources = "0 mb,2vcores";
+ } else if (resource.equals(ResourceInformation.VCORES_URI)) {
+ resources = "10000 mb,0vcores";
+ }
+ out.println("<minResources>" + resources + "</minResources>");
+ out.println("<maxResources>" + resources + "</maxResources>");
+ out.println("<weight>2.0</weight>");
+ out.println("</queue>");
+ out.println("<queue name=\"queueB\">");
+ out.println("<minResources>1 mb 1 vcores</minResources>");
+ out.println("<weight>0.0</weight>");
+ out.println("</queue>");
+ out.println("</allocations>");
+ out.close();
+ }
+
+ @Test
+ public void testSchedulingRejectedToQueueWithZeroCapacityOfMemory()
+ throws IOException {
+ // This request is not valid as queue will have 0 capacity of memory and
+ // the requests asks 2048M
+ ResourceRequest invalidRequest =
+ createResourceRequest(2048, 2, ResourceRequest.ANY, 1, 2, true);
+
+ ResourceRequest validRequest =
+ createResourceRequest(0, 0, ResourceRequest.ANY, 1, 2, true);
+ testSchedulingRejectedToQueueZeroCapacityOfResource(
+ ResourceInformation.MEMORY_URI,
+ Lists.newArrayList(invalidRequest, validRequest));
+ }
+
+ @Test
+ public void testSchedulingAllowedToQueueWithZeroCapacityOfMemory()
+ throws IOException {
+ testSchedulingAllowedToQueueZeroCapacityOfResource(
+ ResourceInformation.MEMORY_URI, 0, 2);
+ }
+
+ @Test
+ public void testSchedulingRejectedToQueueWithZeroCapacityOfVcores()
+ throws IOException {
+ // This request is not valid as queue will have 0 capacity of vCores and
+ // the requests asks 1
+ ResourceRequest invalidRequest =
+ createResourceRequest(0, 1, ResourceRequest.ANY, 1, 2, true);
+
+ ResourceRequest validRequest =
+ createResourceRequest(0, 0, ResourceRequest.ANY, 1, 2, true);
+
+ testSchedulingRejectedToQueueZeroCapacityOfResource(
+ ResourceInformation.VCORES_URI,
+ Lists.newArrayList(invalidRequest, validRequest));
+ }
+
+ @Test
+ public void testSchedulingAllowedToQueueWithZeroCapacityOfVcores()
+ throws IOException {
+ testSchedulingAllowedToQueueZeroCapacityOfResource(
+ ResourceInformation.VCORES_URI, 2048, 0);
+ }
+
+ private void testSchedulingRejectedToQueueZeroCapacityOfResource(
+ String resource, Collection<ResourceRequest> requests)
+ throws IOException {
+ conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
+ generateAllocationFileWithZeroResource(resource);
+
+ scheduler.init(conf);
+ scheduler.start();
+ scheduler.reinitialize(conf, resourceManager.getRMContext());
+
+ // Add a node
+ RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(2048, 2));
+ NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
+ scheduler.handle(nodeEvent1);
+
+ try {
+ createSchedulingRequest(requests, "queueA", "user1");
+ fail("Exception is expected because the queue has zero capacity of "
+ + resource + " and requested resource capabilities are: "
+ + requests.stream().map(ResourceRequest::getCapability)
+ .collect(Collectors.toList()));
+ } catch (SchedulerInvalidResoureRequestException e) {
+ assertTrue(
+ "The thrown exception is not the expected one. Exception message: "
+ + e.getMessage(),
+ e.getMessage()
+ .matches("Resource request is invalid for application "
+ + "application[\\d_]+ because queue root\\.queueA has 0 "
+ + "amount of resource for a resource type! "
+ + "Validation result:.*"));
+
+ List<ApplicationAttemptId> appsInQueue =
+ scheduler.getAppsInQueue("queueA");
+ assertEquals("Number of apps in queue 'queueA' should be one!", 1,
+ appsInQueue.size());
+
+ ApplicationAttemptId appAttemptId =
+ scheduler.getAppsInQueue("queueA").get(0);
+ assertNotNull(
+ "Scheduler app for appAttemptId " + appAttemptId
+ + " should not be null!",
+ scheduler.getSchedulerApp(appAttemptId));
+
+ FSAppAttempt schedulerApp = scheduler.getSchedulerApp(appAttemptId);
+ assertNotNull("Scheduler app queueInfo for appAttemptId " + appAttemptId
+ + " should not be null!", schedulerApp.getAppSchedulingInfo());
+
+ assertTrue("There should be no requests accepted", schedulerApp
+ .getAppSchedulingInfo().getAllResourceRequests().isEmpty());
+ }
+ }
+
+ private void testSchedulingAllowedToQueueZeroCapacityOfResource(
+ String resource, int memory, int vCores) throws IOException {
+ conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
+ generateAllocationFileWithZeroResource(resource);
+
+ scheduler.init(conf);
+ scheduler.start();
+ scheduler.reinitialize(conf, resourceManager.getRMContext());
+
+ // Add a node
+ RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(2048, 2));
+ NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
+ scheduler.handle(nodeEvent1);
+
+ createSchedulingRequest(memory, vCores, "queueA", "user1", 1, 2);
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org