You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ha...@apache.org on 2018/02/16 22:00:34 UTC
[12/21] hadoop git commit: YARN-7920. Simplify configuration for
PlacementConstraints. Contributed by Wangda Tan.
YARN-7920. Simplify configuration for PlacementConstraints. Contributed by Wangda Tan.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/0b489e56
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/0b489e56
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/0b489e56
Branch: refs/heads/HDFS-12996
Commit: 0b489e564ce5a50324a530e29c18aa8a75276c50
Parents: 4747395
Author: Konstantinos Karanasos <kk...@apache.org>
Authored: Thu Feb 15 14:23:27 2018 -0800
Committer: Konstantinos Karanasos <kk...@apache.org>
Committed: Thu Feb 15 14:23:38 2018 -0800
----------------------------------------------------------------------
.../hadoop/yarn/conf/YarnConfiguration.java | 54 ++-
.../TestAMRMClientPlacementConstraints.java | 3 +-
.../src/main/resources/yarn-default.xml | 10 +-
.../ApplicationMasterService.java | 46 ++-
.../scheduler/capacity/CapacityScheduler.java | 13 -
.../CapacitySchedulerConfiguration.java | 5 -
.../processor/AbstractPlacementProcessor.java | 96 +++++
.../processor/DisabledPlacementProcessor.java | 77 ++++
.../processor/PlacementConstraintProcessor.java | 340 +++++++++++++++++
.../processor/PlacementProcessor.java | 377 -------------------
.../processor/SchedulerPlacementProcessor.java | 55 +++
...apacitySchedulerSchedulingRequestUpdate.java | 4 +
...estSchedulingRequestContainerAllocation.java | 8 +-
...hedulingRequestContainerAllocationAsync.java | 4 +-
.../scheduler/capacity/TestUtils.java | 4 +-
.../constraint/TestPlacementProcessor.java | 12 +-
.../src/site/markdown/PlacementConstraints.md | 136 +++++++
.../site/markdown/PlacementConstraints.md.vm | 149 --------
18 files changed, 818 insertions(+), 575 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0b489e56/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index 118f9fb..6677478 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -532,11 +532,57 @@ public class YarnConfiguration extends Configuration {
public static final String RM_SCHEDULER =
RM_PREFIX + "scheduler.class";
- /** Enable rich placement constraints. */
- public static final String RM_PLACEMENT_CONSTRAINTS_ENABLED =
- RM_PREFIX + "placement-constraints.enabled";
+ /**
+ * Specify which handler will be used to process PlacementConstraints.
+ * For details on PlacementConstraints, please refer to
+ * {@link org.apache.hadoop.yarn.api.resource.PlacementConstraint}
+ */
+ @Private
+ public static final String RM_PLACEMENT_CONSTRAINTS_HANDLER =
+ RM_PREFIX + "placement-constraints.handler";
+
+ /**
+ * This handler rejects all allocate calls made by an application, if they
+ * contain a {@link org.apache.hadoop.yarn.api.records.SchedulingRequest}.
+ */
+ @Private
+ public static final String DISABLED_RM_PLACEMENT_CONSTRAINTS_HANDLER =
+ "disabled";
- public static final boolean DEFAULT_RM_PLACEMENT_CONSTRAINTS_ENABLED = false;
+ /**
+ * Using this handler, the placement of containers with constraints is
+ * determined as a pre-processing step before the capacity or the fair
+ * scheduler is called. Once the placement is decided, the capacity/fair
+ * scheduler is invoked to perform the actual allocation. The advantage of
+ * this approach is that it supports all constraint types (affinity,
+ * anti-affinity, cardinality). Moreover, it considers multiple containers at
+ * a time, which allows to satisfy more constraints than a container-at-a-time
+ * approach can achieve. As it sits outside the main scheduler, it can be used
+ * by both the capacity and fair schedulers. Note that at the moment it does
+ * not account for task priorities within an application, given that such
+ * priorities might be conflicting with the placement constraints.
+ */
+ @Private
+ public static final String PROCESSOR_RM_PLACEMENT_CONSTRAINTS_HANDLER =
+ "placement-processor";
+
+ /**
+ * Using this handler, containers with constraints will be placed by the main
+ * scheduler. If the configured RM scheduler
+ * <pre>yarn.resourcemanager.scheduler.class</pre>
+ * cannot handle placement constraints, the corresponding SchedulingRequests
+ * will be rejected. As of now, only the capacity scheduler supports
+ * SchedulingRequests. In particular, it currently supports anti-affinity
+ * constraints (no affinity or cardinality) and places one container at a
+ * time. The advantage of this handler compared to the placement-processor is
+ * that it follows the same ordering rules for queues (sorted by utilization,
+ * priority) and apps (sorted by FIFO/fairness/priority) as the ones followed
+ * by the main scheduler.
+ */
+ @Private
+ public static final String
+ SCHEDULER_RM_PLACEMENT_CONSTRAINTS_HANDLER =
+ "scheduler";
/** Placement Algorithm. */
public static final String RM_PLACEMENT_CONSTRAINTS_ALGORITHM_CLASS =
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0b489e56/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientPlacementConstraints.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientPlacementConstraints.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientPlacementConstraints.java
index fdc8d58..0e88299 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientPlacementConstraints.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientPlacementConstraints.java
@@ -65,7 +65,8 @@ public class TestAMRMClientPlacementConstraints extends BaseAMRMClientTest {
// mismatches between client and server
teardown();
conf = new YarnConfiguration();
- conf.setBoolean(YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_ENABLED, true);
+ conf.set(YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_HANDLER,
+ YarnConfiguration.PROCESSOR_RM_PLACEMENT_CONSTRAINTS_HANDLER);
createClusterAndStartApplication(conf);
AMRMClient<AMRMClient.ContainerRequest> amClient =
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0b489e56/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index 509a040..adf8d8a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -131,9 +131,13 @@
</property>
<property>
- <description>Enable Constraint Placement.</description>
- <name>yarn.resourcemanager.placement-constraints.enabled</name>
- <value>false</value>
+ <description>
+ Specify which handler will be used to process PlacementConstraints.
+ Acceptable values are: `placement-processor`, `scheduler` and `disabled`.
+ For a detailed explanation of these values, please refer to documentation.
+ </description>
+ <name>yarn.resourcemanager.placement-constraints.handler</name>
+ <value>disabled</value>
</property>
<property>
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0b489e56/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
index aa1177d..ae28879 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
@@ -59,7 +59,6 @@ import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.processor.PlacementProcessor;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor;
@@ -67,6 +66,10 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.processor.AbstractPlacementProcessor;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.processor.DisabledPlacementProcessor;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.processor.PlacementConstraintProcessor;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.processor.SchedulerPlacementProcessor;
import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager;
import org.apache.hadoop.yarn.server.resourcemanager.security.authorize.RMPolicyProvider;
import org.apache.hadoop.yarn.server.security.MasterKeyData;
@@ -118,20 +121,47 @@ public class ApplicationMasterService extends AbstractService implements
initializeProcessingChain(conf);
}
+ private void addPlacementConstraintHandler(Configuration conf) {
+ String placementConstraintsHandler =
+ conf.get(YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_HANDLER,
+ YarnConfiguration.DISABLED_RM_PLACEMENT_CONSTRAINTS_HANDLER);
+ if (placementConstraintsHandler
+ .equals(YarnConfiguration.DISABLED_RM_PLACEMENT_CONSTRAINTS_HANDLER)) {
+ LOG.info(YarnConfiguration.DISABLED_RM_PLACEMENT_CONSTRAINTS_HANDLER
+ + " placement handler will be used, all scheduling requests will "
+ + "be rejected.");
+ amsProcessingChain.addProcessor(new DisabledPlacementProcessor());
+ } else if (placementConstraintsHandler
+ .equals(YarnConfiguration.PROCESSOR_RM_PLACEMENT_CONSTRAINTS_HANDLER)) {
+ LOG.info(YarnConfiguration.PROCESSOR_RM_PLACEMENT_CONSTRAINTS_HANDLER
+ + " placement handler will be used. Scheduling requests will be "
+ + "handled by the placement constraint processor");
+ amsProcessingChain.addProcessor(new PlacementConstraintProcessor());
+ } else if (placementConstraintsHandler
+ .equals(YarnConfiguration.SCHEDULER_RM_PLACEMENT_CONSTRAINTS_HANDLER)) {
+ LOG.info(YarnConfiguration.SCHEDULER_RM_PLACEMENT_CONSTRAINTS_HANDLER
+ + " placement handler will be used. Scheduling requests will be "
+ + "handled by the main scheduler.");
+ amsProcessingChain.addProcessor(new SchedulerPlacementProcessor());
+ }
+ }
+
private void initializeProcessingChain(Configuration conf) {
amsProcessingChain.init(rmContext, null);
- boolean enablePlacementConstraints = conf.getBoolean(
- YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_ENABLED,
- YarnConfiguration.DEFAULT_RM_PLACEMENT_CONSTRAINTS_ENABLED);
- if (enablePlacementConstraints) {
- amsProcessingChain.addProcessor(new PlacementProcessor());
- }
+ addPlacementConstraintHandler(conf);
+
List<ApplicationMasterServiceProcessor> processors = getProcessorList(conf);
if (processors != null) {
Collections.reverse(processors);
for (ApplicationMasterServiceProcessor p : processors) {
// Ensure only single instance of PlacementProcessor is included
- if (enablePlacementConstraints && p instanceof PlacementProcessor) {
+ if (p instanceof AbstractPlacementProcessor) {
+ LOG.warn("Found PlacementProcessor=" + p.getClass().getCanonicalName()
+ + " defined in "
+ + YarnConfiguration.RM_APPLICATION_MASTER_SERVICE_PROCESSORS
+ + ", however PlacementProcessor handler should be configured "
+ + "by using " + YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_HANDLER
+ + ", this processor will be ignored.");
continue;
}
this.amsProcessingChain.addProcessor(p);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0b489e56/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
index cd9d1373..ddab0c1 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
@@ -63,7 +63,6 @@ import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.ResourceSizing;
import org.apache.hadoop.yarn.api.records.SchedulingRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.exceptions.SchedulerInvalidResoureRequestException;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.SchedulerResourceTypes;
@@ -1098,18 +1097,6 @@ public class CapacityScheduler extends
return EMPTY_ALLOCATION;
}
- if ((!getConfiguration().getBoolean(
- CapacitySchedulerConfiguration.SCHEDULING_REQUEST_ALLOWED,
- CapacitySchedulerConfiguration.DEFAULT_SCHEDULING_REQUEST_ALLOWED))
- && schedulingRequests != null && (!schedulingRequests.isEmpty())) {
- throw new SchedulerInvalidResoureRequestException(
- "Application attempt:" + applicationAttemptId
- + " is using SchedulingRequest, which is disabled. Please update "
- + CapacitySchedulerConfiguration.SCHEDULING_REQUEST_ALLOWED
- + " to true in capacity-scheduler.xml in order to use this "
- + "feature.");
- }
-
// The allocate may be the leftover from previous attempt, and it will
// impact current attempt, such as confuse the request and allocation for
// current attempt's AM container.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0b489e56/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
index 00733a1..e609be9 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
@@ -77,11 +77,6 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
@Private
public static final String PREFIX = "yarn.scheduler.capacity.";
-
- @Private
- public static final String SCHEDULING_REQUEST_ALLOWED =
- PREFIX + "scheduling-request.allowed";
- public static final boolean DEFAULT_SCHEDULING_REQUEST_ALLOWED = false;
@Private
public static final String DOT = ".";
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0b489e56/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/AbstractPlacementProcessor.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/AbstractPlacementProcessor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/AbstractPlacementProcessor.java
new file mode 100644
index 0000000..96ae623
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/AbstractPlacementProcessor.java
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.processor;
+
+import org.apache.hadoop.yarn.ams.ApplicationMasterServiceContext;
+import org.apache.hadoop.yarn.ams.ApplicationMasterServiceProcessor;
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.resource.PlacementConstraint;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.PlacementConstraintManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Base class for all PlacementProcessors.
+ */
+public abstract class AbstractPlacementProcessor implements
+ ApplicationMasterServiceProcessor{
+ private static final Logger LOG =
+ LoggerFactory.getLogger(AbstractPlacementProcessor.class);
+
+ protected ApplicationMasterServiceProcessor nextAMSProcessor;
+ protected AbstractYarnScheduler scheduler;
+ private PlacementConstraintManager constraintManager;
+
+ @Override
+ public void init(ApplicationMasterServiceContext amsContext,
+ ApplicationMasterServiceProcessor nextProcessor) {
+ this.nextAMSProcessor = nextProcessor;
+ this.scheduler =
+ (AbstractYarnScheduler) ((RMContextImpl) amsContext).getScheduler();
+ this.constraintManager =
+ ((RMContextImpl)amsContext).getPlacementConstraintManager();
+ }
+
+ @Override
+ public void registerApplicationMaster(
+ ApplicationAttemptId applicationAttemptId,
+ RegisterApplicationMasterRequest request,
+ RegisterApplicationMasterResponse response)
+ throws IOException, YarnException {
+ Map<Set<String>, PlacementConstraint> appPlacementConstraints =
+ request.getPlacementConstraints();
+ processPlacementConstraints(applicationAttemptId.getApplicationId(),
+ appPlacementConstraints);
+ nextAMSProcessor.registerApplicationMaster(applicationAttemptId, request,
+ response);
+ }
+
+ private void processPlacementConstraints(ApplicationId applicationId,
+ Map<Set<String>, PlacementConstraint> appPlacementConstraints) {
+ if (appPlacementConstraints != null && !appPlacementConstraints.isEmpty()) {
+ LOG.info("Constraints added for application [{}] against tags [{}]",
+ applicationId, appPlacementConstraints);
+ constraintManager.registerApplication(
+ applicationId, appPlacementConstraints);
+ }
+ }
+
+ @Override
+ public void finishApplicationMaster(ApplicationAttemptId applicationAttemptId,
+ FinishApplicationMasterRequest request,
+ FinishApplicationMasterResponse response) {
+ constraintManager.unregisterApplication(
+ applicationAttemptId.getApplicationId());
+ this.nextAMSProcessor.finishApplicationMaster(applicationAttemptId, request,
+ response);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0b489e56/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/DisabledPlacementProcessor.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/DisabledPlacementProcessor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/DisabledPlacementProcessor.java
new file mode 100644
index 0000000..0d093a7
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/DisabledPlacementProcessor.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.processor;
+
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+/**
+ * Processor that reject all SchedulingRequests.
+ */
+public class DisabledPlacementProcessor extends AbstractPlacementProcessor {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(DisabledPlacementProcessor.class);
+
+ @Override
+ public void registerApplicationMaster(
+ ApplicationAttemptId applicationAttemptId,
+ RegisterApplicationMasterRequest request,
+ RegisterApplicationMasterResponse response)
+ throws IOException, YarnException {
+ if (request.getPlacementConstraints() != null && !request
+ .getPlacementConstraints().isEmpty()) {
+ String message = "Found non empty placement constraints map in "
+ + "RegisterApplicationMasterRequest for application="
+ + applicationAttemptId.toString() + ", but the configured "
+ + YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_HANDLER
+ + " cannot handle placement constraints. Rejecting this "
+ + "registerApplicationMaster operation";
+ LOG.warn(message);
+ throw new YarnException(message);
+ }
+ nextAMSProcessor.registerApplicationMaster(applicationAttemptId, request,
+ response);
+ }
+
+ @Override
+ public void allocate(ApplicationAttemptId appAttemptId,
+ AllocateRequest request, AllocateResponse response) throws YarnException {
+ if (request.getSchedulingRequests() != null && !request
+ .getSchedulingRequests().isEmpty()) {
+ String message = "Found non empty SchedulingRequest in "
+ + "AllocateRequest for application="
+ + appAttemptId.toString() + ", but the configured "
+ + YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_HANDLER
+ + " cannot handle placement constraints. Rejecting this "
+ + "allocate operation";
+ LOG.warn(message);
+ throw new YarnException(message);
+ }
+ nextAMSProcessor.allocate(appAttemptId, request, response);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0b489e56/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/PlacementConstraintProcessor.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/PlacementConstraintProcessor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/PlacementConstraintProcessor.java
new file mode 100644
index 0000000..f089a19
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/PlacementConstraintProcessor.java
@@ -0,0 +1,340 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.processor;
+
+import org.apache.hadoop.yarn.ams.ApplicationMasterServiceContext;
+import org.apache.hadoop.yarn.ams.ApplicationMasterServiceProcessor;
+import org.apache.hadoop.yarn.ams.ApplicationMasterServiceUtils;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.RejectedSchedulingRequest;
+import org.apache.hadoop.yarn.api.records.RejectionReason;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceSizing;
+import org.apache.hadoop.yarn.api.records.SchedulingRequest;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.algorithm.DefaultPlacementAlgorithm;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.api.ConstraintPlacementAlgorithm;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.api.PlacedSchedulingRequest;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.api.SchedulingRequestWithPlacementAttempt;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.api.SchedulingResponse;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.stream.Collectors;
+
+/**
+ * An ApplicationMasterServiceProcessor that performs Constrained placement of
+ * Scheduling Requests. It does the following:
+ * 1. All initialization.
+ * 2. Intercepts placement constraints from the register call and adds it to
+ * the placement constraint manager.
+ * 3. Dispatches Scheduling Requests to the Planner.
+ */
+public class PlacementConstraintProcessor extends AbstractPlacementProcessor {
+
+ /**
+ * Wrapper over the SchedulingResponse that wires in the placement attempt
+ * and last attempted Node.
+ */
+ static final class Response extends SchedulingResponse {
+
+ private final int placementAttempt;
+ private final SchedulerNode attemptedNode;
+
+ private Response(boolean isSuccess, ApplicationId applicationId,
+ SchedulingRequest schedulingRequest, int placementAttempt,
+ SchedulerNode attemptedNode) {
+ super(isSuccess, applicationId, schedulingRequest);
+ this.placementAttempt = placementAttempt;
+ this.attemptedNode = attemptedNode;
+ }
+ }
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(PlacementConstraintProcessor.class);
+
+ private ExecutorService schedulingThreadPool;
+ private int retryAttempts;
+ private Map<ApplicationId, List<BatchedRequests>> requestsToRetry =
+ new ConcurrentHashMap<>();
+ private Map<ApplicationId, List<SchedulingRequest>> requestsToReject =
+ new ConcurrentHashMap<>();
+
+ private BatchedRequests.IteratorType iteratorType;
+ private PlacementDispatcher placementDispatcher;
+
+
+ @Override
+ public void init(ApplicationMasterServiceContext amsContext,
+ ApplicationMasterServiceProcessor nextProcessor) {
+ LOG.info("Initializing Constraint Placement Processor:");
+ super.init(amsContext, nextProcessor);
+
+ // Only the first class is considered - even if a comma separated
+ // list is provided. (This is for simplicity, since getInstances does a
+ // lot of good things by handling things correctly)
+ List<ConstraintPlacementAlgorithm> instances =
+ ((RMContextImpl) amsContext).getYarnConfiguration().getInstances(
+ YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_ALGORITHM_CLASS,
+ ConstraintPlacementAlgorithm.class);
+ ConstraintPlacementAlgorithm algorithm = null;
+ if (instances != null && !instances.isEmpty()) {
+ algorithm = instances.get(0);
+ } else {
+ algorithm = new DefaultPlacementAlgorithm();
+ }
+ LOG.info("Placement Algorithm [{}]", algorithm.getClass().getName());
+
+ String iteratorName = ((RMContextImpl) amsContext).getYarnConfiguration()
+ .get(YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_ALGORITHM_ITERATOR,
+ BatchedRequests.IteratorType.SERIAL.name());
+ LOG.info("Placement Algorithm Iterator[{}]", iteratorName);
+ try {
+ iteratorType = BatchedRequests.IteratorType.valueOf(iteratorName);
+ } catch (IllegalArgumentException e) {
+ throw new YarnRuntimeException(
+ "Could not instantiate Placement Algorithm Iterator: ", e);
+ }
+
+ int algoPSize = ((RMContextImpl) amsContext).getYarnConfiguration().getInt(
+ YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_ALGORITHM_POOL_SIZE,
+ YarnConfiguration.DEFAULT_RM_PLACEMENT_CONSTRAINTS_ALGORITHM_POOL_SIZE);
+ this.placementDispatcher = new PlacementDispatcher();
+ this.placementDispatcher.init(
+ ((RMContextImpl)amsContext), algorithm, algoPSize);
+ LOG.info("Planning Algorithm pool size [{}]", algoPSize);
+
+ int schedPSize = ((RMContextImpl) amsContext).getYarnConfiguration().getInt(
+ YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_SCHEDULER_POOL_SIZE,
+ YarnConfiguration.DEFAULT_RM_PLACEMENT_CONSTRAINTS_SCHEDULER_POOL_SIZE);
+ this.schedulingThreadPool = Executors.newFixedThreadPool(schedPSize);
+ LOG.info("Scheduler pool size [{}]", schedPSize);
+
+ // Number of times a request that is not satisfied by the scheduler
+ // can be retried.
+ this.retryAttempts =
+ ((RMContextImpl) amsContext).getYarnConfiguration().getInt(
+ YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_RETRY_ATTEMPTS,
+ YarnConfiguration.DEFAULT_RM_PLACEMENT_CONSTRAINTS_RETRY_ATTEMPTS);
+ LOG.info("Num retry attempts [{}]", this.retryAttempts);
+ }
+
+ @Override
+ public void allocate(ApplicationAttemptId appAttemptId,
+ AllocateRequest request, AllocateResponse response) throws YarnException {
+ // Copy the scheduling request since we will clear it later after sending
+ // to dispatcher
+ List<SchedulingRequest> schedulingRequests =
+ new ArrayList<>(request.getSchedulingRequests());
+ dispatchRequestsForPlacement(appAttemptId, schedulingRequests);
+ reDispatchRetryableRequests(appAttemptId);
+ schedulePlacedRequests(appAttemptId);
+
+ // Remove SchedulingRequest from AllocateRequest to avoid SchedulingRequest
+ // added to scheduler.
+ request.setSchedulingRequests(Collections.emptyList());
+
+ nextAMSProcessor.allocate(appAttemptId, request, response);
+
+ handleRejectedRequests(appAttemptId, response);
+ }
+
+ private void dispatchRequestsForPlacement(ApplicationAttemptId appAttemptId,
+ List<SchedulingRequest> schedulingRequests) {
+ if (schedulingRequests != null && !schedulingRequests.isEmpty()) {
+ // Normalize the Requests before dispatching
+ schedulingRequests.forEach(req -> {
+ Resource reqResource = req.getResourceSizing().getResources();
+ req.getResourceSizing()
+ .setResources(this.scheduler.getNormalizedResource(reqResource));
+ });
+ this.placementDispatcher.dispatch(new BatchedRequests(iteratorType,
+ appAttemptId.getApplicationId(), schedulingRequests, 1));
+ }
+ }
+
+ private void reDispatchRetryableRequests(ApplicationAttemptId appAttId) {
+ List<BatchedRequests> reqsToRetry =
+ this.requestsToRetry.get(appAttId.getApplicationId());
+ if (reqsToRetry != null && !reqsToRetry.isEmpty()) {
+ synchronized (reqsToRetry) {
+ for (BatchedRequests bReq: reqsToRetry) {
+ this.placementDispatcher.dispatch(bReq);
+ }
+ reqsToRetry.clear();
+ }
+ }
+ }
+
+ private void schedulePlacedRequests(ApplicationAttemptId appAttemptId) {
+ ApplicationId applicationId = appAttemptId.getApplicationId();
+ List<PlacedSchedulingRequest> placedSchedulingRequests =
+ this.placementDispatcher.pullPlacedRequests(applicationId);
+ for (PlacedSchedulingRequest placedReq : placedSchedulingRequests) {
+ SchedulingRequest sReq = placedReq.getSchedulingRequest();
+ for (SchedulerNode node : placedReq.getNodes()) {
+ final SchedulingRequest sReqClone =
+ SchedulingRequest.newInstance(sReq.getAllocationRequestId(),
+ sReq.getPriority(), sReq.getExecutionType(),
+ sReq.getAllocationTags(),
+ ResourceSizing.newInstance(
+ sReq.getResourceSizing().getResources()),
+ sReq.getPlacementConstraint());
+ SchedulerApplicationAttempt applicationAttempt =
+ this.scheduler.getApplicationAttempt(appAttemptId);
+ Runnable task = () -> {
+ boolean success =
+ scheduler.attemptAllocationOnNode(
+ applicationAttempt, sReqClone, node);
+ if (!success) {
+ LOG.warn("Unsuccessful allocation attempt [{}] for [{}]",
+ placedReq.getPlacementAttempt(), sReqClone);
+ }
+ handleSchedulingResponse(
+ new Response(success, applicationId, sReqClone,
+ placedReq.getPlacementAttempt(), node));
+ };
+ this.schedulingThreadPool.submit(task);
+ }
+ }
+ }
+
+ private void handleRejectedRequests(ApplicationAttemptId appAttemptId,
+ AllocateResponse response) {
+ List<SchedulingRequestWithPlacementAttempt> rejectedAlgoRequests =
+ this.placementDispatcher.pullRejectedRequests(
+ appAttemptId.getApplicationId());
+ if (rejectedAlgoRequests != null && !rejectedAlgoRequests.isEmpty()) {
+ LOG.warn("Following requests of [{}] were rejected by" +
+ " the PlacementAlgorithmOutput Algorithm: {}",
+ appAttemptId.getApplicationId(), rejectedAlgoRequests);
+ rejectedAlgoRequests.stream()
+ .filter(req -> req.getPlacementAttempt() < retryAttempts)
+ .forEach(req -> handleSchedulingResponse(
+ new Response(false, appAttemptId.getApplicationId(),
+ req.getSchedulingRequest(), req.getPlacementAttempt(),
+ null)));
+ ApplicationMasterServiceUtils.addToRejectedSchedulingRequests(response,
+ rejectedAlgoRequests.stream()
+ .filter(req -> req.getPlacementAttempt() >= retryAttempts)
+ .map(sr -> RejectedSchedulingRequest.newInstance(
+ RejectionReason.COULD_NOT_PLACE_ON_NODE,
+ sr.getSchedulingRequest()))
+ .collect(Collectors.toList()));
+ }
+ List<SchedulingRequest> rejectedRequests =
+ this.requestsToReject.get(appAttemptId.getApplicationId());
+ if (rejectedRequests != null && !rejectedRequests.isEmpty()) {
+ synchronized (rejectedRequests) {
+ LOG.warn("Following requests of [{}] exhausted all retry attempts " +
+ "trying to schedule on placed node: {}",
+ appAttemptId.getApplicationId(), rejectedRequests);
+ ApplicationMasterServiceUtils.addToRejectedSchedulingRequests(response,
+ rejectedRequests.stream()
+ .map(sr -> RejectedSchedulingRequest.newInstance(
+ RejectionReason.COULD_NOT_SCHEDULE_ON_NODE, sr))
+ .collect(Collectors.toList()));
+ rejectedRequests.clear();
+ }
+ }
+ }
+
+ @Override
+ public void finishApplicationMaster(ApplicationAttemptId appAttemptId,
+ FinishApplicationMasterRequest request,
+ FinishApplicationMasterResponse response) {
+ placementDispatcher.clearApplicationState(appAttemptId.getApplicationId());
+ requestsToReject.remove(appAttemptId.getApplicationId());
+ requestsToRetry.remove(appAttemptId.getApplicationId());
+ super.finishApplicationMaster(appAttemptId, request, response);
+ }
+
+ private void handleSchedulingResponse(SchedulingResponse schedulerResponse) {
+ int placementAttempt = ((Response)schedulerResponse).placementAttempt;
+ // Retry this placement as it is not successful and we are still
+ // under max retry. The req is batched with other unsuccessful
+ // requests from the same app
+ if (!schedulerResponse.isSuccess() && placementAttempt < retryAttempts) {
+ List<BatchedRequests> reqsToRetry =
+ requestsToRetry.computeIfAbsent(
+ schedulerResponse.getApplicationId(),
+ k -> new ArrayList<>());
+ synchronized (reqsToRetry) {
+ addToRetryList(schedulerResponse, placementAttempt, reqsToRetry);
+ }
+ LOG.warn("Going to retry request for application [{}] after [{}]" +
+ " attempts: [{}]", schedulerResponse.getApplicationId(),
+ placementAttempt, schedulerResponse.getSchedulingRequest());
+ } else {
+ if (!schedulerResponse.isSuccess()) {
+ LOG.warn("Not retrying request for application [{}] after [{}]" +
+ " attempts: [{}]", schedulerResponse.getApplicationId(),
+ placementAttempt, schedulerResponse.getSchedulingRequest());
+ List<SchedulingRequest> reqsToReject =
+ requestsToReject.computeIfAbsent(
+ schedulerResponse.getApplicationId(),
+ k -> new ArrayList<>());
+ synchronized (reqsToReject) {
+ reqsToReject.add(schedulerResponse.getSchedulingRequest());
+ }
+ }
+ }
+ }
+
+ private void addToRetryList(SchedulingResponse schedulerResponse,
+ int placementAttempt, List<BatchedRequests> reqsToRetry) {
+ boolean isAdded = false;
+ for (BatchedRequests br : reqsToRetry) {
+ if (br.getPlacementAttempt() == placementAttempt + 1) {
+ br.addToBatch(schedulerResponse.getSchedulingRequest());
+ br.addToBlacklist(
+ schedulerResponse.getSchedulingRequest().getAllocationTags(),
+ ((Response) schedulerResponse).attemptedNode);
+ isAdded = true;
+ break;
+ }
+ }
+ if (!isAdded) {
+ BatchedRequests br = new BatchedRequests(iteratorType,
+ schedulerResponse.getApplicationId(),
+ Collections.singleton(schedulerResponse.getSchedulingRequest()),
+ placementAttempt + 1);
+ reqsToRetry.add(br);
+ br.addToBlacklist(
+ schedulerResponse.getSchedulingRequest().getAllocationTags(),
+ ((Response) schedulerResponse).attemptedNode);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0b489e56/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/PlacementProcessor.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/PlacementProcessor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/PlacementProcessor.java
deleted file mode 100644
index 9ce38f4..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/PlacementProcessor.java
+++ /dev/null
@@ -1,377 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.processor;
-
-import org.apache.hadoop.yarn.ams.ApplicationMasterServiceContext;
-import org.apache.hadoop.yarn.ams.ApplicationMasterServiceProcessor;
-import org.apache.hadoop.yarn.ams.ApplicationMasterServiceUtils;
-import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
-import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
-import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
-import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.RejectedSchedulingRequest;
-import org.apache.hadoop.yarn.api.records.RejectionReason;
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.api.records.ResourceSizing;
-import org.apache.hadoop.yarn.api.records.SchedulingRequest;
-import org.apache.hadoop.yarn.api.resource.PlacementConstraint;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.exceptions.YarnException;
-import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
-import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.PlacementConstraintManager;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.algorithm.DefaultPlacementAlgorithm;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.api.ConstraintPlacementAlgorithm;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.api.PlacedSchedulingRequest;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.api.SchedulingRequestWithPlacementAttempt;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.api.SchedulingResponse;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.stream.Collectors;
-
-/**
- * An ApplicationMasterService Processor that performs Constrained placement of
- * Scheduling Requests. It does the following:
- * 1. All initialization.
- * 2. Intercepts placement constraints from the register call and adds it to
- * the placement constraint manager.
- * 3. Dispatches Scheduling Requests to the Planner.
- */
-public class PlacementProcessor implements ApplicationMasterServiceProcessor {
-
- /**
- * Wrapper over the SchedulingResponse that wires in the placement attempt
- * and last attempted Node.
- */
- static final class Response extends SchedulingResponse {
-
- private final int placementAttempt;
- private final SchedulerNode attemptedNode;
-
- private Response(boolean isSuccess, ApplicationId applicationId,
- SchedulingRequest schedulingRequest, int placementAttempt,
- SchedulerNode attemptedNode) {
- super(isSuccess, applicationId, schedulingRequest);
- this.placementAttempt = placementAttempt;
- this.attemptedNode = attemptedNode;
- }
- }
-
- private static final Logger LOG =
- LoggerFactory.getLogger(PlacementProcessor.class);
- private PlacementConstraintManager constraintManager;
- private ApplicationMasterServiceProcessor nextAMSProcessor;
-
- private AbstractYarnScheduler scheduler;
- private ExecutorService schedulingThreadPool;
- private int retryAttempts;
- private Map<ApplicationId, List<BatchedRequests>> requestsToRetry =
- new ConcurrentHashMap<>();
- private Map<ApplicationId, List<SchedulingRequest>> requestsToReject =
- new ConcurrentHashMap<>();
-
- private BatchedRequests.IteratorType iteratorType;
- private PlacementDispatcher placementDispatcher;
-
-
- @Override
- public void init(ApplicationMasterServiceContext amsContext,
- ApplicationMasterServiceProcessor nextProcessor) {
- LOG.info("Initializing Constraint Placement Processor:");
- this.nextAMSProcessor = nextProcessor;
- this.constraintManager =
- ((RMContextImpl)amsContext).getPlacementConstraintManager();
-
- this.scheduler =
- (AbstractYarnScheduler)((RMContextImpl)amsContext).getScheduler();
- // Only the first class is considered - even if a comma separated
- // list is provided. (This is for simplicity, since getInstances does a
- // lot of good things by handling things correctly)
- List<ConstraintPlacementAlgorithm> instances =
- ((RMContextImpl) amsContext).getYarnConfiguration().getInstances(
- YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_ALGORITHM_CLASS,
- ConstraintPlacementAlgorithm.class);
- ConstraintPlacementAlgorithm algorithm = null;
- if (instances != null && !instances.isEmpty()) {
- algorithm = instances.get(0);
- } else {
- algorithm = new DefaultPlacementAlgorithm();
- }
- LOG.info("Placement Algorithm [{}]", algorithm.getClass().getName());
-
- String iteratorName = ((RMContextImpl) amsContext).getYarnConfiguration()
- .get(YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_ALGORITHM_ITERATOR,
- BatchedRequests.IteratorType.SERIAL.name());
- LOG.info("Placement Algorithm Iterator[{}]", iteratorName);
- try {
- iteratorType = BatchedRequests.IteratorType.valueOf(iteratorName);
- } catch (IllegalArgumentException e) {
- throw new YarnRuntimeException(
- "Could not instantiate Placement Algorithm Iterator: ", e);
- }
-
- int algoPSize = ((RMContextImpl) amsContext).getYarnConfiguration().getInt(
- YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_ALGORITHM_POOL_SIZE,
- YarnConfiguration.DEFAULT_RM_PLACEMENT_CONSTRAINTS_ALGORITHM_POOL_SIZE);
- this.placementDispatcher = new PlacementDispatcher();
- this.placementDispatcher.init(
- ((RMContextImpl)amsContext), algorithm, algoPSize);
- LOG.info("Planning Algorithm pool size [{}]", algoPSize);
-
- int schedPSize = ((RMContextImpl) amsContext).getYarnConfiguration().getInt(
- YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_SCHEDULER_POOL_SIZE,
- YarnConfiguration.DEFAULT_RM_PLACEMENT_CONSTRAINTS_SCHEDULER_POOL_SIZE);
- this.schedulingThreadPool = Executors.newFixedThreadPool(schedPSize);
- LOG.info("Scheduler pool size [{}]", schedPSize);
-
- // Number of times a request that is not satisfied by the scheduler
- // can be retried.
- this.retryAttempts =
- ((RMContextImpl) amsContext).getYarnConfiguration().getInt(
- YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_RETRY_ATTEMPTS,
- YarnConfiguration.DEFAULT_RM_PLACEMENT_CONSTRAINTS_RETRY_ATTEMPTS);
- LOG.info("Num retry attempts [{}]", this.retryAttempts);
- }
-
- @Override
- public void registerApplicationMaster(ApplicationAttemptId appAttemptId,
- RegisterApplicationMasterRequest request,
- RegisterApplicationMasterResponse response)
- throws IOException, YarnException {
- Map<Set<String>, PlacementConstraint> appPlacementConstraints =
- request.getPlacementConstraints();
- processPlacementConstraints(
- appAttemptId.getApplicationId(), appPlacementConstraints);
- nextAMSProcessor.registerApplicationMaster(appAttemptId, request, response);
- }
-
- private void processPlacementConstraints(ApplicationId applicationId,
- Map<Set<String>, PlacementConstraint> appPlacementConstraints) {
- if (appPlacementConstraints != null && !appPlacementConstraints.isEmpty()) {
- LOG.info("Constraints added for application [{}] against tags [{}]",
- applicationId, appPlacementConstraints);
- constraintManager.registerApplication(
- applicationId, appPlacementConstraints);
- }
- }
-
- @Override
- public void allocate(ApplicationAttemptId appAttemptId,
- AllocateRequest request, AllocateResponse response) throws YarnException {
- // Copy the scheduling request since we will clear it later after sending
- // to dispatcher
- List<SchedulingRequest> schedulingRequests =
- new ArrayList<>(request.getSchedulingRequests());
- dispatchRequestsForPlacement(appAttemptId, schedulingRequests);
- reDispatchRetryableRequests(appAttemptId);
- schedulePlacedRequests(appAttemptId);
-
- // Remove SchedulingRequest from AllocateRequest to avoid SchedulingRequest
- // added to scheduler.
- request.setSchedulingRequests(Collections.emptyList());
-
- nextAMSProcessor.allocate(appAttemptId, request, response);
-
- handleRejectedRequests(appAttemptId, response);
- }
-
- private void dispatchRequestsForPlacement(ApplicationAttemptId appAttemptId,
- List<SchedulingRequest> schedulingRequests) {
- if (schedulingRequests != null && !schedulingRequests.isEmpty()) {
- // Normalize the Requests before dispatching
- schedulingRequests.forEach(req -> {
- Resource reqResource = req.getResourceSizing().getResources();
- req.getResourceSizing()
- .setResources(this.scheduler.getNormalizedResource(reqResource));
- });
- this.placementDispatcher.dispatch(new BatchedRequests(iteratorType,
- appAttemptId.getApplicationId(), schedulingRequests, 1));
- }
- }
-
- private void reDispatchRetryableRequests(ApplicationAttemptId appAttId) {
- List<BatchedRequests> reqsToRetry =
- this.requestsToRetry.get(appAttId.getApplicationId());
- if (reqsToRetry != null && !reqsToRetry.isEmpty()) {
- synchronized (reqsToRetry) {
- for (BatchedRequests bReq: reqsToRetry) {
- this.placementDispatcher.dispatch(bReq);
- }
- reqsToRetry.clear();
- }
- }
- }
-
- private void schedulePlacedRequests(ApplicationAttemptId appAttemptId) {
- ApplicationId applicationId = appAttemptId.getApplicationId();
- List<PlacedSchedulingRequest> placedSchedulingRequests =
- this.placementDispatcher.pullPlacedRequests(applicationId);
- for (PlacedSchedulingRequest placedReq : placedSchedulingRequests) {
- SchedulingRequest sReq = placedReq.getSchedulingRequest();
- for (SchedulerNode node : placedReq.getNodes()) {
- final SchedulingRequest sReqClone =
- SchedulingRequest.newInstance(sReq.getAllocationRequestId(),
- sReq.getPriority(), sReq.getExecutionType(),
- sReq.getAllocationTags(),
- ResourceSizing.newInstance(
- sReq.getResourceSizing().getResources()),
- sReq.getPlacementConstraint());
- SchedulerApplicationAttempt applicationAttempt =
- this.scheduler.getApplicationAttempt(appAttemptId);
- Runnable task = () -> {
- boolean success =
- scheduler.attemptAllocationOnNode(
- applicationAttempt, sReqClone, node);
- if (!success) {
- LOG.warn("Unsuccessful allocation attempt [{}] for [{}]",
- placedReq.getPlacementAttempt(), sReqClone);
- }
- handleSchedulingResponse(
- new Response(success, applicationId, sReqClone,
- placedReq.getPlacementAttempt(), node));
- };
- this.schedulingThreadPool.submit(task);
- }
- }
- }
-
- private void handleRejectedRequests(ApplicationAttemptId appAttemptId,
- AllocateResponse response) {
- List<SchedulingRequestWithPlacementAttempt> rejectedAlgoRequests =
- this.placementDispatcher.pullRejectedRequests(
- appAttemptId.getApplicationId());
- if (rejectedAlgoRequests != null && !rejectedAlgoRequests.isEmpty()) {
- LOG.warn("Following requests of [{}] were rejected by" +
- " the PlacementAlgorithmOutput Algorithm: {}",
- appAttemptId.getApplicationId(), rejectedAlgoRequests);
- rejectedAlgoRequests.stream()
- .filter(req -> req.getPlacementAttempt() < retryAttempts)
- .forEach(req -> handleSchedulingResponse(
- new Response(false, appAttemptId.getApplicationId(),
- req.getSchedulingRequest(), req.getPlacementAttempt(),
- null)));
- ApplicationMasterServiceUtils.addToRejectedSchedulingRequests(response,
- rejectedAlgoRequests.stream()
- .filter(req -> req.getPlacementAttempt() >= retryAttempts)
- .map(sr -> RejectedSchedulingRequest.newInstance(
- RejectionReason.COULD_NOT_PLACE_ON_NODE,
- sr.getSchedulingRequest()))
- .collect(Collectors.toList()));
- }
- List<SchedulingRequest> rejectedRequests =
- this.requestsToReject.get(appAttemptId.getApplicationId());
- if (rejectedRequests != null && !rejectedRequests.isEmpty()) {
- synchronized (rejectedRequests) {
- LOG.warn("Following requests of [{}] exhausted all retry attempts " +
- "trying to schedule on placed node: {}",
- appAttemptId.getApplicationId(), rejectedRequests);
- ApplicationMasterServiceUtils.addToRejectedSchedulingRequests(response,
- rejectedRequests.stream()
- .map(sr -> RejectedSchedulingRequest.newInstance(
- RejectionReason.COULD_NOT_SCHEDULE_ON_NODE, sr))
- .collect(Collectors.toList()));
- rejectedRequests.clear();
- }
- }
- }
-
- @Override
- public void finishApplicationMaster(ApplicationAttemptId appAttemptId,
- FinishApplicationMasterRequest request,
- FinishApplicationMasterResponse response) {
- constraintManager.unregisterApplication(appAttemptId.getApplicationId());
- placementDispatcher.clearApplicationState(appAttemptId.getApplicationId());
- requestsToReject.remove(appAttemptId.getApplicationId());
- requestsToRetry.remove(appAttemptId.getApplicationId());
- nextAMSProcessor.finishApplicationMaster(appAttemptId, request, response);
- }
-
- private void handleSchedulingResponse(SchedulingResponse schedulerResponse) {
- int placementAttempt = ((Response)schedulerResponse).placementAttempt;
- // Retry this placement as it is not successful and we are still
- // under max retry. The req is batched with other unsuccessful
- // requests from the same app
- if (!schedulerResponse.isSuccess() && placementAttempt < retryAttempts) {
- List<BatchedRequests> reqsToRetry =
- requestsToRetry.computeIfAbsent(
- schedulerResponse.getApplicationId(),
- k -> new ArrayList<>());
- synchronized (reqsToRetry) {
- addToRetryList(schedulerResponse, placementAttempt, reqsToRetry);
- }
- LOG.warn("Going to retry request for application [{}] after [{}]" +
- " attempts: [{}]", schedulerResponse.getApplicationId(),
- placementAttempt, schedulerResponse.getSchedulingRequest());
- } else {
- if (!schedulerResponse.isSuccess()) {
- LOG.warn("Not retrying request for application [{}] after [{}]" +
- " attempts: [{}]", schedulerResponse.getApplicationId(),
- placementAttempt, schedulerResponse.getSchedulingRequest());
- List<SchedulingRequest> reqsToReject =
- requestsToReject.computeIfAbsent(
- schedulerResponse.getApplicationId(),
- k -> new ArrayList<>());
- synchronized (reqsToReject) {
- reqsToReject.add(schedulerResponse.getSchedulingRequest());
- }
- }
- }
- }
-
- private void addToRetryList(SchedulingResponse schedulerResponse,
- int placementAttempt, List<BatchedRequests> reqsToRetry) {
- boolean isAdded = false;
- for (BatchedRequests br : reqsToRetry) {
- if (br.getPlacementAttempt() == placementAttempt + 1) {
- br.addToBatch(schedulerResponse.getSchedulingRequest());
- br.addToBlacklist(
- schedulerResponse.getSchedulingRequest().getAllocationTags(),
- ((Response) schedulerResponse).attemptedNode);
- isAdded = true;
- break;
- }
- }
- if (!isAdded) {
- BatchedRequests br = new BatchedRequests(iteratorType,
- schedulerResponse.getApplicationId(),
- Collections.singleton(schedulerResponse.getSchedulingRequest()),
- placementAttempt + 1);
- reqsToRetry.add(br);
- br.addToBlacklist(
- schedulerResponse.getSchedulingRequest().getAllocationTags(),
- ((Response) schedulerResponse).attemptedNode);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0b489e56/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/SchedulerPlacementProcessor.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/SchedulerPlacementProcessor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/SchedulerPlacementProcessor.java
new file mode 100644
index 0000000..5332e34
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/SchedulerPlacementProcessor.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.processor;
+
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Forwarding SchedulingRequests to be handled by the scheduler, as long as the
+ * scheduler supports SchedulingRequests.
+ */
+public class SchedulerPlacementProcessor extends AbstractPlacementProcessor {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(SchedulerPlacementProcessor.class);
+
+ @Override
+ public void allocate(ApplicationAttemptId appAttemptId,
+ AllocateRequest request, AllocateResponse response) throws YarnException {
+ if (request.getSchedulingRequests() != null
+ && !request.getSchedulingRequests().isEmpty()) {
+ if (!(scheduler instanceof CapacityScheduler)) {
+ String message = "Found non empty SchedulingRequest of "
+ + "AllocateRequest for application=" + appAttemptId.toString()
+ + ", however the configured scheduler="
+ + scheduler.getClass().getCanonicalName()
+ + " cannot handle placement constraints, rejecting this "
+ + "allocate operation";
+ LOG.warn(message);
+ throw new YarnException(message);
+ }
+ }
+ nextAMSProcessor.allocate(appAttemptId, request, response);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0b489e56/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerSchedulingRequestUpdate.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerSchedulingRequestUpdate.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerSchedulingRequestUpdate.java
index 484d780..ee7e013 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerSchedulingRequestUpdate.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerSchedulingRequestUpdate.java
@@ -50,6 +50,8 @@ public class TestCapacitySchedulerSchedulingRequestUpdate
Configuration conf = TestUtils.getConfigurationWithQueueLabels(
new Configuration(false));
conf.setBoolean(YarnConfiguration.NODE_LABELS_ENABLED, true);
+ conf.set(YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_HANDLER,
+ YarnConfiguration.SCHEDULER_RM_PLACEMENT_CONSTRAINTS_HANDLER);
final RMNodeLabelsManager mgr = new NullRMNodeLabelsManager();
mgr.init(conf);
@@ -166,6 +168,8 @@ public class TestCapacitySchedulerSchedulingRequestUpdate
Configuration conf = TestUtils.getConfigurationWithQueueLabels(
new Configuration(false));
conf.setBoolean(YarnConfiguration.NODE_LABELS_ENABLED, true);
+ conf.set(YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_HANDLER,
+ YarnConfiguration.SCHEDULER_RM_PLACEMENT_CONSTRAINTS_HANDLER);
final RMNodeLabelsManager mgr = new NullRMNodeLabelsManager();
mgr.init(conf);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0b489e56/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestSchedulingRequestContainerAllocation.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestSchedulingRequestContainerAllocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestSchedulingRequestContainerAllocation.java
index b297f79..27d8661 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestSchedulingRequestContainerAllocation.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestSchedulingRequestContainerAllocation.java
@@ -58,8 +58,8 @@ public class TestSchedulingRequestContainerAllocation {
public void testIntraAppAntiAffinity() throws Exception {
Configuration csConf = TestUtils.getConfigurationWithMultipleQueues(
new Configuration());
- csConf.setBoolean(CapacitySchedulerConfiguration.SCHEDULING_REQUEST_ALLOWED,
- true);
+ csConf.set(YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_HANDLER,
+ YarnConfiguration.SCHEDULER_RM_PLACEMENT_CONSTRAINTS_HANDLER);
// inject node label manager
MockRM rm1 = new MockRM(csConf) {
@@ -141,8 +141,8 @@ public class TestSchedulingRequestContainerAllocation {
public void testIntraAppAntiAffinityWithMultipleTags() throws Exception {
Configuration csConf = TestUtils.getConfigurationWithMultipleQueues(
new Configuration());
- csConf.setBoolean(CapacitySchedulerConfiguration.SCHEDULING_REQUEST_ALLOWED,
- true);
+ csConf.set(YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_HANDLER,
+ YarnConfiguration.SCHEDULER_RM_PLACEMENT_CONSTRAINTS_HANDLER);
// inject node label manager
MockRM rm1 = new MockRM(csConf) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0b489e56/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestSchedulingRequestContainerAllocationAsync.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestSchedulingRequestContainerAllocationAsync.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestSchedulingRequestContainerAllocationAsync.java
index fc1cb0d..d1d05dc 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestSchedulingRequestContainerAllocationAsync.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestSchedulingRequestContainerAllocationAsync.java
@@ -57,13 +57,13 @@ public class TestSchedulingRequestContainerAllocationAsync {
private void testIntraAppAntiAffinityAsync(int numThreads) throws Exception {
Configuration csConf = TestUtils.getConfigurationWithMultipleQueues(
new Configuration());
- csConf.setBoolean(CapacitySchedulerConfiguration.SCHEDULING_REQUEST_ALLOWED,
- true);
csConf.setInt(
CapacitySchedulerConfiguration.SCHEDULE_ASYNCHRONOUSLY_MAXIMUM_THREAD,
numThreads);
csConf.setInt(CapacitySchedulerConfiguration.SCHEDULE_ASYNCHRONOUSLY_PREFIX
+ ".scheduling-interval-ms", 0);
+ csConf.set(YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_HANDLER,
+ YarnConfiguration.SCHEDULER_RM_PLACEMENT_CONSTRAINTS_HANDLER);
// inject node label manager
MockRM rm1 = new MockRM(csConf) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0b489e56/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java
index 7180e24..fae63be 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java
@@ -275,9 +275,7 @@ public class TestUtils {
public static Configuration getConfigurationWithQueueLabels(Configuration config) {
CapacitySchedulerConfiguration conf =
new CapacitySchedulerConfiguration(config);
- conf.setBoolean(CapacitySchedulerConfiguration.SCHEDULING_REQUEST_ALLOWED,
- true);
-
+
// Define top-level queues
conf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] {"a", "b", "c"});
conf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "x", 100);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0b489e56/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestPlacementProcessor.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestPlacementProcessor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestPlacementProcessor.java
index c4c0b5d..e129a75 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestPlacementProcessor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestPlacementProcessor.java
@@ -86,8 +86,8 @@ public class TestPlacementProcessor {
YarnConfiguration conf = new YarnConfiguration(csConf);
conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
ResourceScheduler.class);
- conf.setBoolean(
- YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_ENABLED, true);
+ conf.set(YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_HANDLER,
+ YarnConfiguration.PROCESSOR_RM_PLACEMENT_CONSTRAINTS_HANDLER);
conf.setInt(
YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_RETRY_ATTEMPTS, 1);
startRM(conf);
@@ -381,8 +381,8 @@ public class TestPlacementProcessor {
YarnConfiguration conf = new YarnConfiguration(csConf);
conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
ResourceScheduler.class);
- conf.setBoolean(
- YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_ENABLED, true);
+ conf.set(YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_HANDLER,
+ YarnConfiguration.PROCESSOR_RM_PLACEMENT_CONSTRAINTS_HANDLER);
startRM(conf);
HashMap<NodeId, MockNM> nodes = new HashMap<>();
@@ -533,8 +533,8 @@ public class TestPlacementProcessor {
YarnConfiguration conf = new YarnConfiguration(csConf);
conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
ResourceScheduler.class);
- conf.setBoolean(
- YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_ENABLED, true);
+ conf.set(YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_HANDLER,
+ YarnConfiguration.PROCESSOR_RM_PLACEMENT_CONSTRAINTS_HANDLER);
conf.setInt(
YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_RETRY_ATTEMPTS, 2);
startRM(conf);
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org