You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by as...@apache.org on 2018/01/18 22:11:17 UTC
[3/3] hadoop git commit: YARN-6599. Support anti-affinity constraint
via AppPlacementAllocator. (Wangda Tan via asuresh)
YARN-6599. Support anti-affinity constraint via AppPlacementAllocator. (Wangda Tan via asuresh)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/b82addc0
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/b82addc0
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/b82addc0
Branch: refs/heads/YARN-6592
Commit: b82addc0a51050accba4b705a613dc274e9ac90a
Parents: 12d4e3b
Author: Arun Suresh <as...@apache.org>
Authored: Thu Jan 18 14:10:30 2018 -0800
Committer: Arun Suresh <as...@apache.org>
Committed: Thu Jan 18 14:10:30 2018 -0800
----------------------------------------------------------------------
.../v2/app/rm/TestRMContainerAllocator.java | 15 +-
.../sls/scheduler/SLSCapacityScheduler.java | 15 +-
.../yarn/sls/scheduler/SLSFairScheduler.java | 12 +-
.../dev-support/findbugs-exclude.xml | 8 +
.../yarn/api/resource/PlacementConstraints.java | 43 +-
.../hadoop/yarn/conf/YarnConfiguration.java | 2 +-
...SchedulerInvalidResoureRequestException.java | 47 ++
.../api/impl/TestAMRMClientOnRMRestart.java | 9 +-
.../impl/pb/AllocateRequestPBImpl.java | 1 +
.../server/scheduler/SchedulerRequestKey.java | 11 +
.../resourcemanager/DefaultAMSProcessor.java | 13 +-
.../rmapp/attempt/RMAppAttemptImpl.java | 5 +-
.../scheduler/AbstractYarnScheduler.java | 3 +-
.../scheduler/AppSchedulingInfo.java | 205 +++++--
.../ApplicationPlacementAllocatorFactory.java | 68 +++
.../scheduler/ApplicationPlacementFactory.java | 63 ---
.../scheduler/ContainerUpdateContext.java | 4 +-
.../scheduler/SchedulerApplicationAttempt.java | 20 +-
.../scheduler/YarnScheduler.java | 15 +-
.../scheduler/capacity/CapacityScheduler.java | 54 +-
.../CapacitySchedulerConfiguration.java | 5 +
.../allocator/RegularContainerAllocator.java | 3 +-
.../scheduler/common/ContainerRequest.java | 12 +
.../scheduler/common/PendingAsk.java | 6 +
.../scheduler/common/fica/FiCaSchedulerApp.java | 6 +
.../constraint/AllocationTagsManager.java | 71 +--
.../constraint/AllocationTagsNamespaces.java | 31 --
.../constraint/PlacementConstraintsUtil.java | 165 ++++--
.../algorithm/DefaultPlacementAlgorithm.java | 2 +-
.../processor/PlacementProcessor.java | 8 +-
.../scheduler/fair/FairScheduler.java | 12 +-
.../scheduler/fifo/FifoScheduler.java | 7 +-
.../placement/AppPlacementAllocator.java | 66 ++-
.../LocalityAppPlacementAllocator.java | 35 +-
.../SingleConstraintAppPlacementAllocator.java | 531 +++++++++++++++++++
.../server/resourcemanager/Application.java | 9 +-
.../yarn/server/resourcemanager/MockAM.java | 51 ++
.../attempt/TestRMAppAttemptTransitions.java | 10 +-
.../rmcontainer/TestRMContainerImpl.java | 6 +-
.../scheduler/TestAppSchedulingInfo.java | 4 +-
.../capacity/CapacitySchedulerTestBase.java | 79 +++
.../capacity/TestCapacityScheduler.java | 90 +---
.../TestCapacitySchedulerAsyncScheduling.java | 2 +-
.../TestCapacitySchedulerAutoQueueCreation.java | 2 +-
...apacitySchedulerSchedulingRequestUpdate.java | 260 +++++++++
.../capacity/TestIncreaseAllocationExpirer.java | 2 +-
...estSchedulingRequestContainerAllocation.java | 277 ++++++++++
...hedulingRequestContainerAllocationAsync.java | 139 +++++
.../scheduler/capacity/TestUtils.java | 2 +
.../constraint/TestAllocationTagsManager.java | 30 +-
.../TestPlacementConstraintsUtil.java | 36 +-
.../scheduler/fair/FairSchedulerTestBase.java | 6 +-
.../fair/TestContinuousScheduling.java | 10 +-
.../scheduler/fair/TestFairScheduler.java | 30 +-
.../scheduler/fifo/TestFifoScheduler.java | 28 +-
...stSingleConstraintAppPlacementAllocator.java | 403 ++++++++++++++
56 files changed, 2557 insertions(+), 492 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b82addc0/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java
index 85e4181..7875917 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java
@@ -111,6 +111,7 @@ import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.api.records.SchedulingRequest;
import org.apache.hadoop.yarn.client.api.TimelineV2Client;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
@@ -1751,6 +1752,7 @@ public class TestRMContainerAllocator {
super();
try {
Configuration conf = new Configuration();
+ init(conf);
reinitialize(conf, rmContext);
} catch (IOException ie) {
LOG.info("add application failed with ", ie);
@@ -1769,8 +1771,8 @@ public class TestRMContainerAllocator {
@Override
public synchronized Allocation allocate(
ApplicationAttemptId applicationAttemptId, List<ResourceRequest> ask,
- List<ContainerId> release, List<String> blacklistAdditions,
- List<String> blacklistRemovals,
+ List<SchedulingRequest> schedulingRequests, List<ContainerId> release,
+ List<String> blacklistAdditions, List<String> blacklistRemovals,
ContainerUpdates updateRequests) {
List<ResourceRequest> askCopy = new ArrayList<ResourceRequest>();
for (ResourceRequest req : ask) {
@@ -1785,7 +1787,7 @@ public class TestRMContainerAllocator {
lastBlacklistAdditions = blacklistAdditions;
lastBlacklistRemovals = blacklistRemovals;
Allocation allocation = super.allocate(
- applicationAttemptId, askCopy, release, blacklistAdditions,
+ applicationAttemptId, askCopy, schedulingRequests, release, blacklistAdditions,
blacklistRemovals, updateRequests);
if (forceResourceLimit != null) {
// Test wants to force the non-default resource limit
@@ -1805,6 +1807,7 @@ public class TestRMContainerAllocator {
super();
try {
Configuration conf = new Configuration();
+ init(conf);
reinitialize(conf, rmContext);
} catch (IOException ie) {
LOG.info("add application failed with ", ie);
@@ -1815,8 +1818,8 @@ public class TestRMContainerAllocator {
@Override
public synchronized Allocation allocate(
ApplicationAttemptId applicationAttemptId, List<ResourceRequest> ask,
- List<ContainerId> release, List<String> blacklistAdditions,
- List<String> blacklistRemovals,
+ List<SchedulingRequest> schedulingRequests, List<ContainerId> release,
+ List<String> blacklistAdditions, List<String> blacklistRemovals,
ContainerUpdates updateRequests) {
List<ResourceRequest> askCopy = new ArrayList<ResourceRequest>();
for (ResourceRequest req : ask) {
@@ -1827,7 +1830,7 @@ public class TestRMContainerAllocator {
}
SecurityUtil.setTokenServiceUseIp(false);
Allocation normalAlloc = super.allocate(
- applicationAttemptId, askCopy, release,
+ applicationAttemptId, askCopy, schedulingRequests, release,
blacklistAdditions, blacklistRemovals, updateRequests);
List<Container> containers = normalAlloc.getContainers();
if(containers.size() > 0) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b82addc0/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSCapacityScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSCapacityScheduler.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSCapacityScheduler.java
index 6848b22..35f3ed1 100644
--- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSCapacityScheduler.java
+++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSCapacityScheduler.java
@@ -35,6 +35,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.api.records.SchedulingRequest;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo;
@@ -42,9 +43,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerUpdates;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
@@ -100,16 +99,17 @@ public class SLSCapacityScheduler extends CapacityScheduler implements
@Override
public Allocation allocate(ApplicationAttemptId attemptId,
- List<ResourceRequest> resourceRequests, List<ContainerId> containerIds,
- List<String> strings, List<String> strings2,
- ContainerUpdates updateRequests) {
+ List<ResourceRequest> resourceRequests,
+ List<SchedulingRequest> schedulingRequests, List<ContainerId> containerIds,
+ List<String> strings, List<String> strings2, ContainerUpdates updateRequests) {
if (metricsON) {
final Timer.Context context = schedulerMetrics.getSchedulerAllocateTimer()
.time();
Allocation allocation = null;
try {
allocation = super
- .allocate(attemptId, resourceRequests, containerIds, strings,
+ .allocate(attemptId, resourceRequests, schedulingRequests,
+ containerIds, strings,
strings2, updateRequests);
return allocation;
} finally {
@@ -123,7 +123,8 @@ public class SLSCapacityScheduler extends CapacityScheduler implements
}
}
} else {
- return super.allocate(attemptId, resourceRequests, containerIds, strings,
+ return super.allocate(attemptId, resourceRequests, schedulingRequests,
+ containerIds, strings,
strings2, updateRequests);
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b82addc0/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSFairScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSFairScheduler.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSFairScheduler.java
index 8e49c51..c27ab3e 100644
--- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSFairScheduler.java
+++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSFairScheduler.java
@@ -29,6 +29,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.api.records.SchedulingRequest;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo;
@@ -39,8 +40,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptR
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSLeafQueue;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
import org.apache.hadoop.yarn.sls.SLSRunner;
import org.apache.hadoop.yarn.sls.conf.SLSConfiguration;
@@ -94,7 +93,8 @@ public class SLSFairScheduler extends FairScheduler
@Override
public Allocation allocate(ApplicationAttemptId attemptId,
- List<ResourceRequest> resourceRequests, List<ContainerId> containerIds,
+ List<ResourceRequest> resourceRequests,
+ List<SchedulingRequest> schedulingRequests, List<ContainerId> containerIds,
List<String> blacklistAdditions, List<String> blacklistRemovals,
ContainerUpdates updateRequests) {
if (metricsON) {
@@ -102,7 +102,8 @@ public class SLSFairScheduler extends FairScheduler
.time();
Allocation allocation = null;
try {
- allocation = super.allocate(attemptId, resourceRequests, containerIds,
+ allocation = super.allocate(attemptId, resourceRequests,
+ schedulingRequests, containerIds,
blacklistAdditions, blacklistRemovals, updateRequests);
return allocation;
} finally {
@@ -116,7 +117,8 @@ public class SLSFairScheduler extends FairScheduler
}
}
} else {
- return super.allocate(attemptId, resourceRequests, containerIds,
+ return super.allocate(attemptId, resourceRequests, schedulingRequests,
+ containerIds,
blacklistAdditions, blacklistRemovals, updateRequests);
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b82addc0/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml b/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
index 6a10312..81b8825 100644
--- a/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
+++ b/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
@@ -650,4 +650,12 @@
<Method name="equals" />
<Bug pattern="EQ_OVERRIDING_EQUALS_NOT_SYMMETRIC" />
</Match>
+
+ <!-- Null pointer exception needs to be ignored here as Findbugs doesn't properly detect code logic -->
+ <Match>
+ <Class name="org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.SingleConstraintAppPlacementAllocator" />
+ <Method name="validateAndSetSchedulingRequest" />
+ <Bug pattern="NP_NULL_ON_SOME_PATH" />
+ </Match>
+
</FindBugsFilter>
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b82addc0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/resource/PlacementConstraints.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/resource/PlacementConstraints.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/resource/PlacementConstraints.java
index c8991cb..ba1beae 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/resource/PlacementConstraints.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/resource/PlacementConstraints.java
@@ -20,8 +20,12 @@ package org.apache.hadoop.yarn.api.resource;
import java.util.concurrent.TimeUnit;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.resource.PlacementConstraint.AbstractConstraint;
import org.apache.hadoop.yarn.api.resource.PlacementConstraint.And;
import org.apache.hadoop.yarn.api.resource.PlacementConstraint.DelayedOr;
@@ -47,6 +51,14 @@ public final class PlacementConstraints {
public static final String NODE = PlacementConstraint.NODE_SCOPE;
public static final String RACK = PlacementConstraint.RACK_SCOPE;
+ public static final String NODE_PARTITION = "yarn_node_partition/";
+
+ private static final String APPLICATION_LABEL_PREFIX =
+ "yarn_application_label/";
+
+ @InterfaceAudience.Private
+ public static final String APPLICATION_LABEL_INTRA_APPLICATION =
+ APPLICATION_LABEL_PREFIX + "%intra_app%";
/**
* Creates a constraint that requires allocations to be placed on nodes that
@@ -187,6 +199,20 @@ public final class PlacementConstraints {
}
/**
+ * Constructs a target expression on a node partition. It is satisfied if
+ * the specified node partition has one of the specified nodePartitions
+ *
+ * @param nodePartitions the set of values that the attribute should take
+ * values from
+ * @return the resulting expression on the node attribute
+ */
+ public static TargetExpression nodePartition(
+ String... nodePartitions) {
+ return new TargetExpression(TargetType.NODE_ATTRIBUTE, NODE_PARTITION,
+ nodePartitions);
+ }
+
+ /**
* Constructs a target expression on an allocation tag. It is satisfied if
* the there are allocations with one of the given tags.
*
@@ -198,6 +224,22 @@ public final class PlacementConstraints {
return new TargetExpression(TargetType.ALLOCATION_TAG, null,
allocationTags);
}
+
+ /**
+ * Constructs a target expression on an allocation tag. It is satisfied if
+ * the there are allocations with one of the given tags. Comparing to
+ * {@link PlacementTargets#allocationTag(String...)}, this only check tags
+ * within the application.
+ *
+ * @param allocationTags the set of tags that the attribute should take
+ * values from
+ * @return the resulting expression on the allocation tags
+ */
+ public static TargetExpression allocationTagToIntraApp(
+ String... allocationTags) {
+ return new TargetExpression(TargetType.ALLOCATION_TAG,
+ APPLICATION_LABEL_INTRA_APPLICATION, allocationTags);
+ }
}
// Creation of compound constraints.
@@ -277,5 +319,4 @@ public final class PlacementConstraints {
public static PlacementConstraint build(AbstractConstraint constraintExpr) {
return constraintExpr.build();
}
-
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b82addc0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index af83d8d..ea8f367 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -541,7 +541,7 @@ public class YarnConfiguration extends Configuration {
public static final String RM_PLACEMENT_CONSTRAINTS_ENABLED =
RM_PREFIX + "placement-constraints.enabled";
- public static final boolean DEFAULT_RM_PLACEMENT_CONSTRAINTS_ENABLED = true;
+ public static final boolean DEFAULT_RM_PLACEMENT_CONSTRAINTS_ENABLED = false;
public static final String RM_PLACEMENT_CONSTRAINTS_RETRY_ATTEMPTS =
RM_PREFIX + "placement-constraints.retry-attempts";
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b82addc0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/exceptions/SchedulerInvalidResoureRequestException.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/exceptions/SchedulerInvalidResoureRequestException.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/exceptions/SchedulerInvalidResoureRequestException.java
new file mode 100644
index 0000000..f55ad83
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/exceptions/SchedulerInvalidResoureRequestException.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.exceptions;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * This exception is thrown when any issue inside scheduler to handle a new or
+ * updated {@link org.apache.hadoop.yarn.api.records.SchedulingRequest}/
+ * {@link org.apache.hadoop.yarn.api.records.ResourceRequest} add to the
+ * scheduler.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class SchedulerInvalidResoureRequestException extends YarnRuntimeException {
+ private static final long serialVersionUID = 10081123982L;
+
+ public SchedulerInvalidResoureRequestException(String message) {
+ super(message);
+ }
+
+ public SchedulerInvalidResoureRequestException(Throwable cause) {
+ super(cause);
+ }
+
+ public SchedulerInvalidResoureRequestException(String message,
+ Throwable cause) {
+ super(message, cause);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b82addc0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientOnRMRestart.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientOnRMRestart.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientOnRMRestart.java
index 337d7d4..11d703d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientOnRMRestart.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientOnRMRestart.java
@@ -44,6 +44,7 @@ import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.api.records.SchedulingRequest;
import org.apache.hadoop.yarn.api.records.UpdateContainerRequest;
import org.apache.hadoop.yarn.client.api.AMRMClient;
import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
@@ -545,6 +546,7 @@ public class TestAMRMClientOnRMRestart {
super();
try {
Configuration conf = new Configuration();
+ init(conf);
reinitialize(conf, rmContext);
} catch (IOException ie) {
assert (false);
@@ -563,8 +565,8 @@ public class TestAMRMClientOnRMRestart {
@Override
public synchronized Allocation allocate(
ApplicationAttemptId applicationAttemptId, List<ResourceRequest> ask,
- List<ContainerId> release, List<String> blacklistAdditions,
- List<String> blacklistRemovals,
+ List<SchedulingRequest> schedulingRequests, List<ContainerId> release,
+ List<String> blacklistAdditions, List<String> blacklistRemovals,
ContainerUpdates updateRequests) {
List<ResourceRequest> askCopy = new ArrayList<ResourceRequest>();
for (ResourceRequest req : ask) {
@@ -580,7 +582,8 @@ public class TestAMRMClientOnRMRestart {
lastDecrease = updateRequests.getDecreaseRequests();
lastBlacklistAdditions = blacklistAdditions;
lastBlacklistRemovals = blacklistRemovals;
- return super.allocate(applicationAttemptId, askCopy, release,
+ return super.allocate(applicationAttemptId, askCopy, schedulingRequests,
+ release,
blacklistAdditions, blacklistRemovals, updateRequests);
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b82addc0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateRequestPBImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateRequestPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateRequestPBImpl.java
index b460044..50672a3 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateRequestPBImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateRequestPBImpl.java
@@ -194,6 +194,7 @@ public class AllocateRequestPBImpl extends AllocateRequest {
public void setSchedulingRequests(
List<SchedulingRequest> schedulingRequests) {
if (schedulingRequests == null) {
+ builder.clearSchedulingRequests();
return;
}
initSchedulingRequests();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b82addc0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/SchedulerRequestKey.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/SchedulerRequestKey.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/SchedulerRequestKey.java
index c4f37f6..0fce083 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/SchedulerRequestKey.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/SchedulerRequestKey.java
@@ -22,6 +22,7 @@ import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.api.records.SchedulingRequest;
import org.apache.hadoop.yarn.api.records.UpdateContainerRequest;
/**
@@ -45,6 +46,16 @@ public final class SchedulerRequestKey implements
req.getAllocationRequestId(), null);
}
+ /**
+ * Factory method to generate a SchedulerRequestKey from a SchedulingRequest.
+ * @param req SchedulingRequest
+ * @return SchedulerRequestKey
+ */
+ public static SchedulerRequestKey create(SchedulingRequest req) {
+ return new SchedulerRequestKey(req.getPriority(),
+ req.getAllocationRequestId(), null);
+ }
+
public static SchedulerRequestKey create(UpdateContainerRequest req,
SchedulerRequestKey schedulerRequestKey) {
return new SchedulerRequestKey(schedulerRequestKey.getPriority(),
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b82addc0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DefaultAMSProcessor.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DefaultAMSProcessor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DefaultAMSProcessor.java
index 713947f..18ab473 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DefaultAMSProcessor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DefaultAMSProcessor.java
@@ -53,6 +53,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.InvalidContainerReleaseException;
import org.apache.hadoop.yarn.exceptions.InvalidResourceBlacklistRequestException;
import org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException;
+import org.apache.hadoop.yarn.exceptions.SchedulerInvalidResoureRequestException;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
@@ -273,10 +274,14 @@ final class DefaultAMSProcessor implements ApplicationMasterServiceProcessor {
" state, ignore container allocate request.");
allocation = EMPTY_ALLOCATION;
} else {
- allocation =
- getScheduler().allocate(appAttemptId, ask, release,
- blacklistAdditions, blacklistRemovals,
- containerUpdateRequests);
+ try {
+ allocation = getScheduler().allocate(appAttemptId, ask,
+ request.getSchedulingRequests(), release,
+ blacklistAdditions, blacklistRemovals, containerUpdateRequests);
+ } catch (SchedulerInvalidResoureRequestException e) {
+ LOG.warn("Exceptions caught when scheduler handling requests");
+ throw new YarnException(e);
+ }
}
if (!blacklistAdditions.isEmpty() || !blacklistRemovals.isEmpty()) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b82addc0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
index cf10be4..8c2f4e4 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
@@ -1113,8 +1113,7 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
Allocation amContainerAllocation =
appAttempt.scheduler.allocate(
appAttempt.applicationAttemptId,
- appAttempt.amReqs,
- EMPTY_CONTAINER_RELEASE_LIST,
+ appAttempt.amReqs, null, EMPTY_CONTAINER_RELEASE_LIST,
amBlacklist.getBlacklistAdditions(),
amBlacklist.getBlacklistRemovals(),
new ContainerUpdates());
@@ -1140,7 +1139,7 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
// Acquire the AM container from the scheduler.
Allocation amContainerAllocation =
appAttempt.scheduler.allocate(appAttempt.applicationAttemptId,
- EMPTY_CONTAINER_REQUEST_LIST, EMPTY_CONTAINER_RELEASE_LIST, null,
+ EMPTY_CONTAINER_REQUEST_LIST, null, EMPTY_CONTAINER_RELEASE_LIST, null,
null, new ContainerUpdates());
// There must be at least one container allocated, because a
// CONTAINER_ALLOCATED is emitted after an RMContainer is constructed,
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b82addc0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
index 74456f6..e3914c7 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
@@ -53,6 +53,7 @@ import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceOption;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.api.records.ResourceSizing;
import org.apache.hadoop.yarn.api.records.SchedulingRequest;
import org.apache.hadoop.yarn.api.records.UpdateContainerError;
import org.apache.hadoop.yarn.api.records.UpdateContainerRequest;
@@ -1151,7 +1152,7 @@ public abstract class AbstractYarnScheduler
*
* @param asks resource requests
*/
- protected void normalizeRequests(List<ResourceRequest> asks) {
+ protected void normalizeResourceRequests(List<ResourceRequest> asks) {
for (ResourceRequest ask: asks) {
ask.setCapability(getNormalizedResource(ask.getCapability()));
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b82addc0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
index 8858d3b..7d6f233 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
@@ -41,6 +41,8 @@ import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.api.records.SchedulingRequest;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
@@ -49,7 +51,9 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.Applicatio
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ContainerRequest;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.PendingAsk;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.AppPlacementAllocator;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.LocalityAppPlacementAllocator;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PendingAskUpdateResult;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.SingleConstraintAppPlacementAllocator;
import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
import org.apache.hadoop.yarn.util.resource.Resources;
/**
@@ -91,11 +95,12 @@ public class AppSchedulingInfo {
public final ContainerUpdateContext updateContext;
public final Map<String, String> applicationSchedulingEnvs = new HashMap<>();
+ private final RMContext rmContext;
public AppSchedulingInfo(ApplicationAttemptId appAttemptId, String user,
Queue queue, AbstractUsersManager abstractUsersManager, long epoch,
ResourceUsage appResourceUsage,
- Map<String, String> applicationSchedulingEnvs) {
+ Map<String, String> applicationSchedulingEnvs, RMContext rmContext) {
this.applicationAttemptId = appAttemptId;
this.applicationId = appAttemptId.getApplicationId();
this.queue = queue;
@@ -105,6 +110,7 @@ public class AppSchedulingInfo {
epoch << ResourceManager.EPOCH_BIT_SHIFT);
this.appResourceUsage = appResourceUsage;
this.applicationSchedulingEnvs.putAll(applicationSchedulingEnvs);
+ this.rmContext = rmContext;
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
updateContext = new ContainerUpdateContext(this);
@@ -163,74 +169,153 @@ public class AppSchedulingInfo {
* application, by asking for more resources and releasing resources acquired
* by the application.
*
- * @param requests
- * resources to be acquired
+ * @param resourceRequests resource requests to be allocated
* @param recoverPreemptedRequestForAContainer
- * recover ResourceRequest on preemption
+ * recover ResourceRequest/SchedulingRequest on preemption
* @return true if any resource was updated, false otherwise
*/
- public boolean updateResourceRequests(List<ResourceRequest> requests,
+ public boolean updateResourceRequests(List<ResourceRequest> resourceRequests,
boolean recoverPreemptedRequestForAContainer) {
- if (null == requests || requests.isEmpty()) {
- return false;
+ // Flag to track if any incoming requests update "ANY" requests
+ boolean offswitchResourcesUpdated;
+
+ writeLock.lock();
+ try {
+ // Update AppPlacementAllocator by requests
+ offswitchResourcesUpdated = internalAddResourceRequests(
+ recoverPreemptedRequestForAContainer, resourceRequests);
+ } finally {
+ writeLock.unlock();
}
+ return offswitchResourcesUpdated;
+ }
+
+ /**
+ * The ApplicationMaster is updating resource requirements for the
+ * application, by asking for more resources and releasing resources acquired
+ * by the application.
+ *
+ * @param dedupRequests (dedup) resource requests to be allocated
+ * @param recoverPreemptedRequestForAContainer
+ * recover ResourceRequest/SchedulingRequest on preemption
+ * @return true if any resource was updated, false otherwise
+ */
+ public boolean updateResourceRequests(
+ Map<SchedulerRequestKey, Map<String, ResourceRequest>> dedupRequests,
+ boolean recoverPreemptedRequestForAContainer) {
// Flag to track if any incoming requests update "ANY" requests
- boolean offswitchResourcesUpdated = false;
+ boolean offswitchResourcesUpdated;
+ writeLock.lock();
try {
- this.writeLock.lock();
-
- // A map to group resource requests and dedup
- Map<SchedulerRequestKey, Map<String, ResourceRequest>> dedupRequests =
- new HashMap<>();
+ // Update AppPlacementAllocator by requests
+ offswitchResourcesUpdated = internalAddResourceRequests(
+ recoverPreemptedRequestForAContainer, dedupRequests);
+ } finally {
+ writeLock.unlock();
+ }
- // Group resource request by schedulerRequestKey and resourceName
- for (ResourceRequest request : requests) {
- SchedulerRequestKey schedulerKey = SchedulerRequestKey.create(request);
- if (!dedupRequests.containsKey(schedulerKey)) {
- dedupRequests.put(schedulerKey, new HashMap<>());
- }
- dedupRequests.get(schedulerKey).put(request.getResourceName(), request);
- }
+ return offswitchResourcesUpdated;
+ }
- // Update AppPlacementAllocator by dedup requests.
- offswitchResourcesUpdated =
- addRequestToAppPlacement(
- recoverPreemptedRequestForAContainer, dedupRequests);
+ /**
+ * The ApplicationMaster is updating resource requirements for the
+ * application, by asking for more resources and releasing resources acquired
+ * by the application.
+ *
+ * @param schedulingRequests resource requests to be allocated
+ * @param recoverPreemptedRequestForAContainer
+ * recover ResourceRequest/SchedulingRequest on preemption
+ * @return true if any resource was updated, false otherwise
+ */
+ public boolean updateSchedulingRequests(
+ List<SchedulingRequest> schedulingRequests,
+ boolean recoverPreemptedRequestForAContainer) {
+ // Flag to track if any incoming requests update "ANY" requests
+ boolean offswitchResourcesUpdated;
- return offswitchResourcesUpdated;
+ writeLock.lock();
+ try {
+ // Update AppPlacementAllocator by requests
+ offswitchResourcesUpdated = addSchedulingRequests(
+ recoverPreemptedRequestForAContainer, schedulingRequests);
} finally {
- this.writeLock.unlock();
+ writeLock.unlock();
}
+
+ return offswitchResourcesUpdated;
}
public void removeAppPlacement(SchedulerRequestKey schedulerRequestKey) {
schedulerKeyToAppPlacementAllocator.remove(schedulerRequestKey);
}
- boolean addRequestToAppPlacement(
+ private boolean addSchedulingRequests(
+ boolean recoverPreemptedRequestForAContainer,
+ List<SchedulingRequest> schedulingRequests) {
+ // Do we need to update pending resource for app/queue, etc.?
+ boolean requireUpdatePendingResource = false;
+
+ for (SchedulingRequest request : schedulingRequests) {
+ SchedulerRequestKey schedulerRequestKey = SchedulerRequestKey.create(
+ request);
+
+ AppPlacementAllocator appPlacementAllocator =
+ getAndAddAppPlacementAllocatorIfNotExist(schedulerRequestKey,
+ SingleConstraintAppPlacementAllocator.class.getCanonicalName());
+
+ // Update AppPlacementAllocator
+ PendingAskUpdateResult pendingAmountChanges =
+ appPlacementAllocator.updatePendingAsk(schedulerRequestKey,
+ request, recoverPreemptedRequestForAContainer);
+
+ if (null != pendingAmountChanges) {
+ updatePendingResources(pendingAmountChanges, schedulerRequestKey,
+ queue.getMetrics());
+ requireUpdatePendingResource = true;
+ }
+ }
+
+ return requireUpdatePendingResource;
+ }
+
+ /**
+ * Get and insert AppPlacementAllocator if it doesn't exist, this should be
+ * protected by write lock.
+ * @param schedulerRequestKey schedulerRequestKey
+ * @param placementTypeClass placementTypeClass
+ * @return AppPlacementAllocator
+ */
+ private AppPlacementAllocator<SchedulerNode> getAndAddAppPlacementAllocatorIfNotExist(
+ SchedulerRequestKey schedulerRequestKey, String placementTypeClass) {
+ AppPlacementAllocator<SchedulerNode> appPlacementAllocator;
+ if ((appPlacementAllocator = schedulerKeyToAppPlacementAllocator.get(
+ schedulerRequestKey)) == null) {
+ appPlacementAllocator =
+ ApplicationPlacementAllocatorFactory.getAppPlacementAllocator(
+ placementTypeClass, this, schedulerRequestKey, rmContext);
+ schedulerKeyToAppPlacementAllocator.put(schedulerRequestKey,
+ appPlacementAllocator);
+ }
+ return appPlacementAllocator;
+ }
+
+ private boolean internalAddResourceRequests(
boolean recoverPreemptedRequestForAContainer,
Map<SchedulerRequestKey, Map<String, ResourceRequest>> dedupRequests) {
boolean offswitchResourcesUpdated = false;
for (Map.Entry<SchedulerRequestKey, Map<String, ResourceRequest>> entry :
dedupRequests.entrySet()) {
SchedulerRequestKey schedulerRequestKey = entry.getKey();
-
- if (!schedulerKeyToAppPlacementAllocator
- .containsKey(schedulerRequestKey)) {
- AppPlacementAllocator<SchedulerNode> placementAllocatorInstance = ApplicationPlacementFactory
- .getAppPlacementAllocator(applicationSchedulingEnvs
- .get(ApplicationSchedulingConfig.ENV_APPLICATION_PLACEMENT_TYPE_CLASS));
- placementAllocatorInstance.setAppSchedulingInfo(this);
-
- schedulerKeyToAppPlacementAllocator.put(schedulerRequestKey,
- placementAllocatorInstance);
- }
+ AppPlacementAllocator<SchedulerNode> appPlacementAllocator =
+ getAndAddAppPlacementAllocatorIfNotExist(schedulerRequestKey,
+ applicationSchedulingEnvs.get(
+ ApplicationSchedulingConfig.ENV_APPLICATION_PLACEMENT_TYPE_CLASS));
// Update AppPlacementAllocator
- PendingAskUpdateResult pendingAmountChanges = schedulerKeyToAppPlacementAllocator
- .get(schedulerRequestKey).updatePendingAsk(entry.getValue().values(),
+ PendingAskUpdateResult pendingAmountChanges =
+ appPlacementAllocator.updatePendingAsk(entry.getValue().values(),
recoverPreemptedRequestForAContainer);
if (null != pendingAmountChanges) {
@@ -242,6 +327,29 @@ public class AppSchedulingInfo {
return offswitchResourcesUpdated;
}
+ private boolean internalAddResourceRequests(boolean recoverPreemptedRequestForAContainer,
+ List<ResourceRequest> resourceRequests) {
+ if (null == resourceRequests || resourceRequests.isEmpty()) {
+ return false;
+ }
+
+ // A map to group resource requests and dedup
+ Map<SchedulerRequestKey, Map<String, ResourceRequest>> dedupRequests =
+ new HashMap<>();
+
+ // Group resource request by schedulerRequestKey and resourceName
+ for (ResourceRequest request : resourceRequests) {
+ SchedulerRequestKey schedulerKey = SchedulerRequestKey.create(request);
+ if (!dedupRequests.containsKey(schedulerKey)) {
+ dedupRequests.put(schedulerKey, new HashMap<>());
+ }
+ dedupRequests.get(schedulerKey).put(request.getResourceName(), request);
+ }
+
+ return internalAddResourceRequests(recoverPreemptedRequestForAContainer,
+ dedupRequests);
+ }
+
private void updatePendingResources(PendingAskUpdateResult updateResult,
SchedulerRequestKey schedulerKey, QueueMetrics metrics) {
@@ -629,13 +737,22 @@ public class AppSchedulingInfo {
}
}
- public boolean acceptNodePartition(SchedulerRequestKey schedulerKey,
- String nodePartition, SchedulingMode schedulingMode) {
+ /**
+ * Pre-check node to see if it satisfy the given schedulerKey and
+ * scheduler mode
+ *
+ * @param schedulerKey schedulerKey
+ * @param schedulerNode schedulerNode
+ * @param schedulingMode schedulingMode
+ * @return can use the node or not.
+ */
+ public boolean precheckNode(SchedulerRequestKey schedulerKey,
+ SchedulerNode schedulerNode, SchedulingMode schedulingMode) {
try {
this.readLock.lock();
AppPlacementAllocator ap =
schedulerKeyToAppPlacementAllocator.get(schedulerKey);
- return (ap != null) && ap.acceptNodePartition(nodePartition,
+ return (ap != null) && ap.precheckNode(schedulerNode,
schedulingMode);
} finally {
this.readLock.unlock();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b82addc0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ApplicationPlacementAllocatorFactory.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ApplicationPlacementAllocatorFactory.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ApplicationPlacementAllocatorFactory.java
new file mode 100644
index 0000000..a4e5484
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ApplicationPlacementAllocatorFactory.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
+
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ApplicationSchedulingConfig;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.AppPlacementAllocator;
+import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
+
+/**
+ * Factory class to build various application placement policies.
+ */
+@Public
+@Unstable
+public class ApplicationPlacementAllocatorFactory {
+
+ /**
+ * Get AppPlacementAllocator related to the placement type requested.
+ *
+ * @param appPlacementAllocatorName
+ * allocator class name.
+ * @return Specific AppPlacementAllocator instance based on type
+ */
+ public static AppPlacementAllocator<SchedulerNode> getAppPlacementAllocator(
+ String appPlacementAllocatorName, AppSchedulingInfo appSchedulingInfo,
+ SchedulerRequestKey schedulerRequestKey, RMContext rmContext) {
+ Class<?> policyClass;
+ try {
+ if (appPlacementAllocatorName == null) {
+ policyClass = ApplicationSchedulingConfig.DEFAULT_APPLICATION_PLACEMENT_TYPE_CLASS;
+ } else {
+ policyClass = Class.forName(appPlacementAllocatorName);
+ }
+ } catch (ClassNotFoundException e) {
+ policyClass = ApplicationSchedulingConfig.DEFAULT_APPLICATION_PLACEMENT_TYPE_CLASS;
+ }
+
+ if (!AppPlacementAllocator.class.isAssignableFrom(policyClass)) {
+ policyClass = ApplicationSchedulingConfig.DEFAULT_APPLICATION_PLACEMENT_TYPE_CLASS;
+ }
+
+ @SuppressWarnings("unchecked")
+ AppPlacementAllocator<SchedulerNode> placementAllocatorInstance = (AppPlacementAllocator<SchedulerNode>) ReflectionUtils
+ .newInstance(policyClass, null);
+ placementAllocatorInstance.initialize(appSchedulingInfo,
+ schedulerRequestKey, rmContext);
+ return placementAllocatorInstance;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b82addc0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ApplicationPlacementFactory.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ApplicationPlacementFactory.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ApplicationPlacementFactory.java
deleted file mode 100644
index 40c8d05..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ApplicationPlacementFactory.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
-
-import org.apache.hadoop.classification.InterfaceAudience.Public;
-import org.apache.hadoop.classification.InterfaceStability.Unstable;
-import org.apache.hadoop.util.ReflectionUtils;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ApplicationSchedulingConfig;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.AppPlacementAllocator;
-
-/**
- * Factory class to build various application placement policies.
- */
-@Public
-@Unstable
-public class ApplicationPlacementFactory {
-
- /**
- * Get AppPlacementAllocator related to the placement type requested.
- *
- * @param appPlacementAllocatorName
- * allocator class name.
- * @return Specific AppPlacementAllocator instance based on type
- */
- public static AppPlacementAllocator<SchedulerNode> getAppPlacementAllocator(
- String appPlacementAllocatorName) {
- Class<?> policyClass;
- try {
- if (appPlacementAllocatorName == null) {
- policyClass = ApplicationSchedulingConfig.DEFAULT_APPLICATION_PLACEMENT_TYPE_CLASS;
- } else {
- policyClass = Class.forName(appPlacementAllocatorName);
- }
- } catch (ClassNotFoundException e) {
- policyClass = ApplicationSchedulingConfig.DEFAULT_APPLICATION_PLACEMENT_TYPE_CLASS;
- }
-
- if (!AppPlacementAllocator.class.isAssignableFrom(policyClass)) {
- policyClass = ApplicationSchedulingConfig.DEFAULT_APPLICATION_PLACEMENT_TYPE_CLASS;
- }
-
- @SuppressWarnings("unchecked")
- AppPlacementAllocator<SchedulerNode> placementAllocatorInstance = (AppPlacementAllocator<SchedulerNode>) ReflectionUtils
- .newInstance(policyClass, null);
- return placementAllocatorInstance;
- }
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b82addc0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ContainerUpdateContext.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ContainerUpdateContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ContainerUpdateContext.java
index f410db1..491a9ce 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ContainerUpdateContext.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ContainerUpdateContext.java
@@ -146,7 +146,7 @@ public class ContainerUpdateContext {
createResourceRequests(rmContainer, schedulerNode,
schedulerKey, resToIncrease);
updateResReqs.put(schedulerKey, resMap);
- appSchedulingInfo.addRequestToAppPlacement(false, updateResReqs);
+ appSchedulingInfo.updateResourceRequests(updateResReqs, false);
}
return true;
}
@@ -290,7 +290,7 @@ public class ContainerUpdateContext {
(rmContainer, node, schedulerKey,
rmContainer.getContainer().getResource());
reqsToUpdate.put(schedulerKey, resMap);
- appSchedulingInfo.addRequestToAppPlacement(true, reqsToUpdate);
+ appSchedulingInfo.updateResourceRequests(reqsToUpdate, true);
return UNDEFINED;
}
return retVal;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b82addc0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
index 3930a35..753c2b8 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
@@ -56,6 +56,7 @@ import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceInformation;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.api.records.SchedulingRequest;
import org.apache.hadoop.yarn.api.records.UpdateContainerError;
import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
import org.apache.hadoop.yarn.server.api.ContainerType;
@@ -231,7 +232,7 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
this.appSchedulingInfo = new AppSchedulingInfo(applicationAttemptId, user,
queue, abstractUsersManager, rmContext.getEpoch(), attemptResourceUsage,
- applicationSchedulingEnvs);
+ applicationSchedulingEnvs, rmContext);
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
readLock = lock.readLock();
writeLock = lock.writeLock();
@@ -451,6 +452,23 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
writeLock.unlock();
}
}
+
+ public boolean updateSchedulingRequests(
+ List<SchedulingRequest> requests) {
+ if (requests == null) {
+ return false;
+ }
+
+ try {
+ writeLock.lock();
+ if (!isStopped) {
+ return appSchedulingInfo.updateSchedulingRequests(requests, false);
+ }
+ return false;
+ } finally {
+ writeLock.unlock();
+ }
+ }
public void recoverResourceRequestsForContainer(
ContainerRequest containerRequest) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b82addc0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java
index 93ca7c2..43d55c4 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java
@@ -42,6 +42,7 @@ import org.apache.hadoop.yarn.api.records.QueueInfo;
import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.api.records.SchedulingRequest;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.exceptions.YarnException;
@@ -132,18 +133,18 @@ public interface YarnScheduler extends EventHandler<SchedulerEvent> {
*
* @param appAttemptId
* @param ask
+ * @param schedulingRequests
* @param release
- * @param blacklistAdditions
- * @param blacklistRemovals
- * @param updateRequests
- * @return the {@link Allocation} for the application
+ * @param blacklistAdditions
+ * @param blacklistRemovals
+ * @param updateRequests @return the {@link Allocation} for the application
*/
@Public
@Stable
Allocation allocate(ApplicationAttemptId appAttemptId,
- List<ResourceRequest> ask, List<ContainerId> release,
- List<String> blacklistAdditions, List<String> blacklistRemovals,
- ContainerUpdates updateRequests);
+ List<ResourceRequest> ask, List<SchedulingRequest> schedulingRequests,
+ List<ContainerId> release, List<String> blacklistAdditions,
+ List<String> blacklistRemovals, ContainerUpdates updateRequests);
/**
* Get node resource usage report.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b82addc0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
index 956d840..0d781ce 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
@@ -58,8 +58,11 @@ import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceOption;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.api.records.ResourceSizing;
import org.apache.hadoop.yarn.api.records.SchedulingRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException;
+import org.apache.hadoop.yarn.exceptions.SchedulerInvalidResoureRequestException;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.SchedulerResourceTypes;
@@ -1013,12 +1016,29 @@ public class CapacityScheduler extends
}
}
+ /**
+ * Normalize a list of SchedulingRequest
+ *
+ * @param asks scheduling request
+ */
+ private void normalizeSchedulingRequests(List<SchedulingRequest> asks) {
+ if (asks == null) {
+ return;
+ }
+ for (SchedulingRequest ask: asks) {
+ ResourceSizing sizing = ask.getResourceSizing();
+ if (sizing != null && sizing.getResources() != null) {
+ sizing.setResources(getNormalizedResource(sizing.getResources()));
+ }
+ }
+ }
+
@Override
@Lock(Lock.NoLock.class)
public Allocation allocate(ApplicationAttemptId applicationAttemptId,
- List<ResourceRequest> ask, List<ContainerId> release,
- List<String> blacklistAdditions, List<String> blacklistRemovals,
- ContainerUpdates updateRequests) {
+ List<ResourceRequest> ask, List<SchedulingRequest> schedulingRequests,
+ List<ContainerId> release, List<String> blacklistAdditions,
+ List<String> blacklistRemovals, ContainerUpdates updateRequests) {
FiCaSchedulerApp application = getApplicationAttempt(applicationAttemptId);
if (application == null) {
LOG.error("Calling allocate on removed or non existent application " +
@@ -1026,6 +1046,18 @@ public class CapacityScheduler extends
return EMPTY_ALLOCATION;
}
+ if ((!getConfiguration().getBoolean(
+ CapacitySchedulerConfiguration.SCHEDULING_REQUEST_ALLOWED,
+ CapacitySchedulerConfiguration.DEFAULT_SCHEDULING_REQUEST_ALLOWED))
+ && schedulingRequests != null && (!schedulingRequests.isEmpty())) {
+ throw new SchedulerInvalidResoureRequestException(
+ "Application attempt:" + applicationAttemptId
+ + " is using SchedulingRequest, which is disabled. Please update "
+ + CapacitySchedulerConfiguration.SCHEDULING_REQUEST_ALLOWED
+ + " to true in capacity-scheduler.xml in order to use this "
+ + "feature.");
+ }
+
// The allocate may be the leftover from previous attempt, and it will
// impact current attempt, such as confuse the request and allocation for
// current attempt's AM container.
@@ -1046,7 +1078,10 @@ public class CapacityScheduler extends
LeafQueue updateDemandForQueue = null;
// Sanity check for new allocation requests
- normalizeRequests(ask);
+ normalizeResourceRequests(ask);
+
+ // Normalize scheduling requests
+ normalizeSchedulingRequests(schedulingRequests);
Allocation allocation;
@@ -1059,7 +1094,8 @@ public class CapacityScheduler extends
}
// Process resource requests
- if (!ask.isEmpty()) {
+ if (!ask.isEmpty() || (schedulingRequests != null && !schedulingRequests
+ .isEmpty())) {
if (LOG.isDebugEnabled()) {
LOG.debug(
"allocate: pre-update " + applicationAttemptId + " ask size ="
@@ -1068,7 +1104,8 @@ public class CapacityScheduler extends
}
// Update application requests
- if (application.updateResourceRequests(ask)) {
+ if (application.updateResourceRequests(ask) || application
+ .updateSchedulingRequests(schedulingRequests)) {
updateDemandForQueue = (LeafQueue) application.getQueue();
}
@@ -2518,10 +2555,9 @@ public class CapacityScheduler extends
// Validate placement constraint is satisfied before
// committing the request.
try {
- if (!PlacementConstraintsUtil.canSatisfyConstraints(
+ if (!PlacementConstraintsUtil.canSatisfySingleConstraint(
appAttempt.getApplicationId(),
- schedulingRequest.getAllocationTags(),
- schedulerNode,
+ schedulingRequest.getAllocationTags(), schedulerNode,
rmContext.getPlacementConstraintManager(),
rmContext.getAllocationTagsManager())) {
LOG.debug("Failed to allocate container for application "
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b82addc0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
index 8aa41ee..fb133cb 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
@@ -76,6 +76,11 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
@Private
public static final String PREFIX = "yarn.scheduler.capacity.";
+
+ @Private
+ public static final String SCHEDULING_REQUEST_ALLOWED =
+ PREFIX + "scheduling-request.allowed";
+ public static final boolean DEFAULT_SCHEDULING_REQUEST_ALLOWED = false;
@Private
public static final String DOT = ".";
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b82addc0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java
index 2642532..afa468b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java
@@ -143,8 +143,7 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
// Is the nodePartition of pending request matches the node's partition
// If not match, jump to next priority.
- if (!appInfo.acceptNodePartition(schedulerKey, node.getPartition(),
- schedulingMode)) {
+ if (!appInfo.precheckNode(schedulerKey, node, schedulingMode)) {
ActivitiesLogger.APP.recordSkippedAppActivityWithoutAllocation(
activitiesManager, node, application, priority,
ActivityDiagnosticConstant.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b82addc0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/ContainerRequest.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/ContainerRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/ContainerRequest.java
index 075db79..cad15a0 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/ContainerRequest.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/ContainerRequest.java
@@ -19,6 +19,7 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.common;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.api.records.SchedulingRequest;
import java.util.List;
@@ -43,12 +44,23 @@ import java.util.List;
*/
public class ContainerRequest {
private List<ResourceRequest> requests;
+ private SchedulingRequest schedulingRequest;
public ContainerRequest(List<ResourceRequest> requests) {
this.requests = requests;
+ schedulingRequest = null;
+ }
+
+ public ContainerRequest(SchedulingRequest schedulingRequest) {
+ this.schedulingRequest = schedulingRequest;
+ this.requests = null;
}
public List<ResourceRequest> getResourceRequests() {
return requests;
}
+
+ public SchedulingRequest getSchedulingRequest() {
+ return schedulingRequest;
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b82addc0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/PendingAsk.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/PendingAsk.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/PendingAsk.java
index 85d8715..2ed3e83 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/PendingAsk.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/PendingAsk.java
@@ -19,6 +19,7 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.common;
import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceSizing;
import org.apache.hadoop.yarn.util.resource.Resources;
/**
@@ -31,6 +32,11 @@ public class PendingAsk {
private final int count;
public final static PendingAsk ZERO = new PendingAsk(Resources.none(), 0);
+ public PendingAsk(ResourceSizing sizing) {
+ this.perAllocationResource = sizing.getResources();
+ this.count = sizing.getNumAllocations();
+ }
+
public PendingAsk(Resource res, int num) {
this.perAllocationResource = res;
this.count = num;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b82addc0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
index 4ea0347..7eb1e31 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
@@ -542,6 +542,12 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
schedulerContainer.getRmContainer().getContainer());
((RMContainerImpl) rmContainer).setContainerRequest(
containerRequest);
+
+ // If this is from a SchedulingRequest, set allocation tags.
+ if (containerRequest.getSchedulingRequest() != null) {
+ ((RMContainerImpl) rmContainer).setAllocationTags(
+ containerRequest.getSchedulingRequest().getAllocationTags());
+ }
}
attemptResourceUsage.incUsed(schedulerContainer.getNodePartition(),
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b82addc0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/AllocationTagsManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/AllocationTagsManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/AllocationTagsManager.java
index 4bb3e79..962e548 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/AllocationTagsManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/AllocationTagsManager.java
@@ -29,6 +29,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.SchedulingRequest;
+import org.apache.hadoop.yarn.api.resource.PlacementConstraints;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.log4j.Logger;
@@ -287,21 +288,15 @@ public class AllocationTagsManager {
* {@link SchedulingRequest#getAllocationTags()}
* application_id will be added to allocationTags.
*/
+ @SuppressWarnings("unchecked")
public void addContainer(NodeId nodeId, ContainerId containerId,
Set<String> allocationTags) {
+ // Do nothing for empty allocation tags.
+ if (allocationTags == null || allocationTags.isEmpty()) {
+ return;
+ }
ApplicationId applicationId =
containerId.getApplicationAttemptId().getApplicationId();
- String applicationIdTag =
- AllocationTagsNamespaces.APP_ID + applicationId.toString();
-
- boolean useSet = false;
- if (allocationTags != null && !allocationTags.isEmpty()) {
- // Copy before edit it.
- allocationTags = new HashSet<>(allocationTags);
- allocationTags.add(applicationIdTag);
- useSet = true;
- }
-
writeLock.lock();
try {
TypeToCountedTags perAppTagsMapping = perAppNodeMappings
@@ -311,19 +306,12 @@ public class AllocationTagsManager {
// Covering test-cases where context is mocked
String nodeRack = (rmContext.getRMNodes() != null
&& rmContext.getRMNodes().get(nodeId) != null)
- ? rmContext.getRMNodes().get(nodeId).getRackName()
- : "default-rack";
- if (useSet) {
- perAppTagsMapping.addTags(nodeId, allocationTags);
- perAppRackTagsMapping.addTags(nodeRack, allocationTags);
- globalNodeMapping.addTags(nodeId, allocationTags);
- globalRackMapping.addTags(nodeRack, allocationTags);
- } else {
- perAppTagsMapping.addTag(nodeId, applicationIdTag);
- perAppRackTagsMapping.addTag(nodeRack, applicationIdTag);
- globalNodeMapping.addTag(nodeId, applicationIdTag);
- globalRackMapping.addTag(nodeRack, applicationIdTag);
- }
+ ? rmContext.getRMNodes().get(nodeId).getRackName() :
+ "default-rack";
+ perAppTagsMapping.addTags(nodeId, allocationTags);
+ perAppRackTagsMapping.addTags(nodeRack, allocationTags);
+ globalNodeMapping.addTags(nodeId, allocationTags);
+ globalRackMapping.addTags(nodeRack, allocationTags);
if (LOG.isDebugEnabled()) {
LOG.debug("Added container=" + containerId + " with tags=["
@@ -341,20 +329,15 @@ public class AllocationTagsManager {
* @param containerId containerId.
* @param allocationTags allocation tags for given container
*/
+ @SuppressWarnings("unchecked")
public void removeContainer(NodeId nodeId,
ContainerId containerId, Set<String> allocationTags) {
+ // Do nothing for empty allocation tags.
+ if (allocationTags == null || allocationTags.isEmpty()) {
+ return;
+ }
ApplicationId applicationId =
containerId.getApplicationAttemptId().getApplicationId();
- String applicationIdTag =
- AllocationTagsNamespaces.APP_ID + applicationId.toString();
- boolean useSet = false;
-
- if (allocationTags != null && !allocationTags.isEmpty()) {
- // Copy before edit it.
- allocationTags = new HashSet<>(allocationTags);
- allocationTags.add(applicationIdTag);
- useSet = true;
- }
writeLock.lock();
try {
@@ -368,19 +351,12 @@ public class AllocationTagsManager {
// Covering test-cases where context is mocked
String nodeRack = (rmContext.getRMNodes() != null
&& rmContext.getRMNodes().get(nodeId) != null)
- ? rmContext.getRMNodes().get(nodeId).getRackName()
- : "default-rack";
- if (useSet) {
- perAppTagsMapping.removeTags(nodeId, allocationTags);
- perAppRackTagsMapping.removeTags(nodeRack, allocationTags);
- globalNodeMapping.removeTags(nodeId, allocationTags);
- globalRackMapping.removeTags(nodeRack, allocationTags);
- } else {
- perAppTagsMapping.removeTag(nodeId, applicationIdTag);
- perAppRackTagsMapping.removeTag(nodeRack, applicationIdTag);
- globalNodeMapping.removeTag(nodeId, applicationIdTag);
- globalRackMapping.removeTag(nodeRack, applicationIdTag);
- }
+ ? rmContext.getRMNodes().get(nodeId).getRackName() :
+ "default-rack";
+ perAppTagsMapping.removeTags(nodeId, allocationTags);
+ perAppRackTagsMapping.removeTags(nodeRack, allocationTags);
+ globalNodeMapping.removeTags(nodeId, allocationTags);
+ globalRackMapping.removeTags(nodeRack, allocationTags);
if (perAppTagsMapping.isEmpty()) {
perAppNodeMappings.remove(applicationId);
@@ -602,6 +578,7 @@ public class AllocationTagsManager {
* @throws InvalidAllocationTagsQueryException when illegal query
* parameter specified
*/
+ @SuppressWarnings("unchecked")
public long getRackCardinalityByOp(String rack, ApplicationId applicationId,
Set<String> tags, LongBinaryOperator op)
throws InvalidAllocationTagsQueryException {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b82addc0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/AllocationTagsNamespaces.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/AllocationTagsNamespaces.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/AllocationTagsNamespaces.java
deleted file mode 100644
index 43fcfe5..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/AllocationTagsNamespaces.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * /
- */
-
-package org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint;
-
-/**
- * Predefined namespaces for tags
- *
- * Same as namespace of resource types. Namespaces of placement tags are start
- * with alphabets and ended with "/"
- */
-public class AllocationTagsNamespaces {
- public static final String APP_ID = "yarn_app_id/";
-}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org