You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ar...@apache.org on 2015/07/07 18:04:10 UTC
[03/24] hadoop git commit: YARN-3508. Prevent processing preemption
events on the main RM dispatcher. (Varun Saxena via wangda)
YARN-3508. Prevent processing preemption events on the main RM dispatcher. (Varun Saxena via wangda)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/0e4b0669
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/0e4b0669
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/0e4b0669
Branch: refs/heads/HDFS-7240
Commit: 0e4b06690ff51fbde3ab26f68fde8aeb32af69af
Parents: 152e5df
Author: Wangda Tan <wa...@apache.org>
Authored: Wed Jul 1 17:32:22 2015 -0700
Committer: Wangda Tan <wa...@apache.org>
Committed: Wed Jul 1 17:32:22 2015 -0700
----------------------------------------------------------------------
hadoop-yarn-project/CHANGES.txt | 3 +
.../server/resourcemanager/ResourceManager.java | 34 ---------
.../monitor/SchedulingEditPolicy.java | 6 +-
.../monitor/SchedulingMonitor.java | 3 +-
.../ProportionalCapacityPreemptionPolicy.java | 42 ++++++-----
.../scheduler/ContainerPreemptEvent.java | 8 +-
.../scheduler/ContainerPreemptEventType.java | 26 -------
.../scheduler/capacity/CapacityScheduler.java | 24 ++++++
.../scheduler/event/SchedulerEventType.java | 7 +-
.../resourcemanager/TestRMDispatcher.java | 79 ++++++++++++++++++++
...estProportionalCapacityPreemptionPolicy.java | 19 +++--
...pacityPreemptionPolicyForNodePartitions.java | 12 ++-
12 files changed, 161 insertions(+), 102 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0e4b0669/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index 3620a71..6839548 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -601,6 +601,9 @@ Release 2.7.2 - UNRELEASED
YARN-3793. Several NPEs when deleting local files on NM recovery (Varun
Saxena via jlowe)
+ YARN-3508. Prevent processing preemption events on the main RM dispatcher.
+ (Varun Saxena via wangda)
+
Release 2.7.1 - UNRELEASED
INCOMPATIBLE CHANGES
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0e4b0669/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
index 4153ba1..1b606b4 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
@@ -76,7 +76,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer;
-import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType;
@@ -614,9 +613,6 @@ public class ResourceManager extends CompositeService implements Recoverable {
YarnConfiguration.RM_SCHEDULER_MONITOR_POLICIES,
SchedulingEditPolicy.class);
if (policies.size() > 0) {
- rmDispatcher.register(ContainerPreemptEventType.class,
- new RMContainerPreemptEventDispatcher(
- (PreemptableResourceScheduler) scheduler));
for (SchedulingEditPolicy policy : policies) {
LOG.info("LOADING SchedulingEditPolicy:" + policy.getPolicyName());
// periodically check whether we need to take action to guarantee
@@ -787,36 +783,6 @@ public class ResourceManager extends CompositeService implements Recoverable {
}
@Private
- public static final class
- RMContainerPreemptEventDispatcher
- implements EventHandler<ContainerPreemptEvent> {
-
- private final PreemptableResourceScheduler scheduler;
-
- public RMContainerPreemptEventDispatcher(
- PreemptableResourceScheduler scheduler) {
- this.scheduler = scheduler;
- }
-
- @Override
- public void handle(ContainerPreemptEvent event) {
- ApplicationAttemptId aid = event.getAppId();
- RMContainer container = event.getContainer();
- switch (event.getType()) {
- case DROP_RESERVATION:
- scheduler.dropContainerReservation(container);
- break;
- case PREEMPT_CONTAINER:
- scheduler.preemptContainer(aid, container);
- break;
- case KILL_CONTAINER:
- scheduler.killContainer(container);
- break;
- }
- }
- }
-
- @Private
public static final class ApplicationAttemptEventDispatcher implements
EventHandler<RMAppAttemptEvent> {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0e4b0669/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/SchedulingEditPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/SchedulingEditPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/SchedulingEditPolicy.java
index 1ebc19f..0d587d8 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/SchedulingEditPolicy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/SchedulingEditPolicy.java
@@ -18,14 +18,12 @@
package org.apache.hadoop.yarn.server.resourcemanager.monitor;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.PreemptableResourceScheduler;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEvent;
public interface SchedulingEditPolicy {
- public void init(Configuration config,
- EventHandler<ContainerPreemptEvent> dispatcher,
+ public void init(Configuration config, RMContext context,
PreemptableResourceScheduler scheduler);
/**
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0e4b0669/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/SchedulingMonitor.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/SchedulingMonitor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/SchedulingMonitor.java
index 1682f7d..d4c129b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/SchedulingMonitor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/SchedulingMonitor.java
@@ -54,9 +54,8 @@ public class SchedulingMonitor extends AbstractService {
return scheduleEditPolicy;
}
- @SuppressWarnings("unchecked")
public void serviceInit(Configuration conf) throws Exception {
- scheduleEditPolicy.init(conf, rmContext.getDispatcher().getEventHandler(),
+ scheduleEditPolicy.init(conf, rmContext,
(PreemptableResourceScheduler) rmContext.getScheduler());
this.monitorInterval = scheduleEditPolicy.getMonitoringInterval();
super.serviceInit(conf);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0e4b0669/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java
index 1f47b5f..5a20a6f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java
@@ -38,13 +38,12 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingEditPolicy;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEventType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.PreemptableResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
@@ -52,6 +51,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.Capacity
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCapacities;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.SystemClock;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
@@ -118,8 +118,7 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic
public static final String NATURAL_TERMINATION_FACTOR =
"yarn.resourcemanager.monitor.capacity.preemption.natural_termination_factor";
- // the dispatcher to send preempt and kill events
- public EventHandler<ContainerPreemptEvent> dispatcher;
+ private RMContext rmContext;
private final Clock clock;
private double maxIgnoredOverCapacity;
@@ -141,20 +140,17 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic
}
public ProportionalCapacityPreemptionPolicy(Configuration config,
- EventHandler<ContainerPreemptEvent> dispatcher,
- CapacityScheduler scheduler) {
- this(config, dispatcher, scheduler, new SystemClock());
+ RMContext context, CapacityScheduler scheduler) {
+ this(config, context, scheduler, new SystemClock());
}
public ProportionalCapacityPreemptionPolicy(Configuration config,
- EventHandler<ContainerPreemptEvent> dispatcher,
- CapacityScheduler scheduler, Clock clock) {
- init(config, dispatcher, scheduler);
+ RMContext context, CapacityScheduler scheduler, Clock clock) {
+ init(config, context, scheduler);
this.clock = clock;
}
- public void init(Configuration config,
- EventHandler<ContainerPreemptEvent> disp,
+ public void init(Configuration config, RMContext context,
PreemptableResourceScheduler sched) {
LOG.info("Preemption monitor:" + this.getClass().getCanonicalName());
assert null == scheduler : "Unexpected duplicate call to init";
@@ -163,7 +159,7 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic
sched.getClass().getCanonicalName() + " not instance of " +
CapacityScheduler.class.getCanonicalName());
}
- dispatcher = disp;
+ rmContext = context;
scheduler = (CapacityScheduler) sched;
maxIgnoredOverCapacity = config.getDouble(MAX_IGNORED_OVER_CAPACITY, 0.1);
naturalTerminationFactor =
@@ -196,6 +192,7 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic
* @param root the root of the CapacityScheduler queue hierarchy
* @param clusterResources the total amount of resources in the cluster
*/
+ @SuppressWarnings("unchecked")
private void containerBasedPreemptOrKill(CSQueue root,
Resource clusterResources) {
// All partitions to look at
@@ -248,8 +245,9 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic
// preempt (or kill) the selected containers
for (Map.Entry<ApplicationAttemptId,Set<RMContainer>> e
: toPreempt.entrySet()) {
+ ApplicationAttemptId appAttemptId = e.getKey();
if (LOG.isDebugEnabled()) {
- LOG.debug("Send to scheduler: in app=" + e.getKey()
+ LOG.debug("Send to scheduler: in app=" + appAttemptId
+ " #containers-to-be-preempted=" + e.getValue().size());
}
for (RMContainer container : e.getValue()) {
@@ -257,13 +255,15 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic
if (preempted.get(container) != null &&
preempted.get(container) + maxWaitTime < clock.getTime()) {
// kill it
- dispatcher.handle(new ContainerPreemptEvent(e.getKey(), container,
- ContainerPreemptEventType.KILL_CONTAINER));
+ rmContext.getDispatcher().getEventHandler().handle(
+ new ContainerPreemptEvent(appAttemptId, container,
+ SchedulerEventType.KILL_CONTAINER));
preempted.remove(container);
} else {
//otherwise just send preemption events
- dispatcher.handle(new ContainerPreemptEvent(e.getKey(), container,
- ContainerPreemptEventType.PREEMPT_CONTAINER));
+ rmContext.getDispatcher().getEventHandler().handle(
+ new ContainerPreemptEvent(appAttemptId, container,
+ SchedulerEventType.PREEMPT_CONTAINER));
if (preempted.get(container) == null) {
preempted.put(container, clock.getTime());
}
@@ -735,6 +735,7 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic
* Given a target preemption for a specific application, select containers
* to preempt (after unreserving all reservation for that app).
*/
+ @SuppressWarnings("unchecked")
private void preemptFrom(FiCaSchedulerApp app,
Resource clusterResource, Map<String, Resource> resToObtainByPartition,
List<RMContainer> skippedAMContainerlist, Resource skippedAMSize,
@@ -758,8 +759,9 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic
clusterResource, preemptMap);
if (!observeOnly) {
- dispatcher.handle(new ContainerPreemptEvent(appId, c,
- ContainerPreemptEventType.DROP_RESERVATION));
+ rmContext.getDispatcher().getEventHandler().handle(
+ new ContainerPreemptEvent(
+ appId, c, SchedulerEventType.DROP_RESERVATION));
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0e4b0669/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ContainerPreemptEvent.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ContainerPreemptEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ContainerPreemptEvent.java
index 8eba48d..7ab2758 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ContainerPreemptEvent.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ContainerPreemptEvent.java
@@ -19,20 +19,20 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
-import org.apache.hadoop.yarn.event.AbstractEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType;
/**
* Simple event class used to communicate containers unreservations, preemption, killing
*/
-public class ContainerPreemptEvent
- extends AbstractEvent<ContainerPreemptEventType> {
+public class ContainerPreemptEvent extends SchedulerEvent {
private final ApplicationAttemptId aid;
private final RMContainer container;
public ContainerPreemptEvent(ApplicationAttemptId aid, RMContainer container,
- ContainerPreemptEventType type) {
+ SchedulerEventType type) {
super(type);
this.aid = aid;
this.container = container;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0e4b0669/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ContainerPreemptEventType.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ContainerPreemptEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ContainerPreemptEventType.java
deleted file mode 100644
index a70a836..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ContainerPreemptEventType.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
-
-public enum ContainerPreemptEventType {
-
- DROP_RESERVATION,
- PREEMPT_CONTAINER,
- KILL_CONTAINER
-
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0e4b0669/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
index f1d0f9c..141aa7f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
@@ -86,6 +86,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.PreemptableResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
@@ -1346,6 +1347,29 @@ public class CapacityScheduler extends
RMContainerEventType.EXPIRE);
}
break;
+ case DROP_RESERVATION:
+ {
+ ContainerPreemptEvent dropReservationEvent = (ContainerPreemptEvent)event;
+ RMContainer container = dropReservationEvent.getContainer();
+ dropContainerReservation(container);
+ }
+ break;
+ case PREEMPT_CONTAINER:
+ {
+ ContainerPreemptEvent preemptContainerEvent =
+ (ContainerPreemptEvent)event;
+ ApplicationAttemptId aid = preemptContainerEvent.getAppId();
+ RMContainer containerToBePreempted = preemptContainerEvent.getContainer();
+ preemptContainer(aid, containerToBePreempted);
+ }
+ break;
+ case KILL_CONTAINER:
+ {
+ ContainerPreemptEvent killContainerEvent = (ContainerPreemptEvent)event;
+ RMContainer containerToBeKilled = killContainerEvent.getContainer();
+ killContainer(containerToBeKilled);
+ }
+ break;
default:
LOG.error("Invalid eventtype " + event.getType() + ". Ignoring!");
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0e4b0669/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/SchedulerEventType.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/SchedulerEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/SchedulerEventType.java
index 13aecb3..9de935b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/SchedulerEventType.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/SchedulerEventType.java
@@ -36,5 +36,10 @@ public enum SchedulerEventType {
APP_ATTEMPT_REMOVED,
// Source: ContainerAllocationExpirer
- CONTAINER_EXPIRED
+ CONTAINER_EXPIRED,
+
+ // Source: SchedulingEditPolicy
+ DROP_RESERVATION,
+ PREEMPT_CONTAINER,
+ KILL_CONTAINER
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0e4b0669/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMDispatcher.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMDispatcher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMDispatcher.java
new file mode 100644
index 0000000..db7c96a
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMDispatcher.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager;
+
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.AsyncDispatcher;
+import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager.SchedulerEventDispatcher;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestRMDispatcher {
+
+ @SuppressWarnings("unchecked")
+ @Test(timeout=10000)
+ public void testSchedulerEventDispatcherForPreemptionEvents() {
+ AsyncDispatcher rmDispatcher = new AsyncDispatcher();
+ CapacityScheduler sched = spy(new CapacityScheduler());
+ YarnConfiguration conf = new YarnConfiguration();
+ SchedulerEventDispatcher schedulerDispatcher =
+ new SchedulerEventDispatcher(sched);
+ rmDispatcher.register(SchedulerEventType.class, schedulerDispatcher);
+ rmDispatcher.init(conf);
+ rmDispatcher.start();
+ schedulerDispatcher.init(conf);
+ schedulerDispatcher.start();
+ try {
+ ApplicationAttemptId appAttemptId = mock(ApplicationAttemptId.class);
+ RMContainer container = mock(RMContainer.class);
+ ContainerPreemptEvent event1 = new ContainerPreemptEvent(
+ appAttemptId, container, SchedulerEventType.DROP_RESERVATION);
+ rmDispatcher.getEventHandler().handle(event1);
+ ContainerPreemptEvent event2 = new ContainerPreemptEvent(
+ appAttemptId, container, SchedulerEventType.KILL_CONTAINER);
+ rmDispatcher.getEventHandler().handle(event2);
+ ContainerPreemptEvent event3 = new ContainerPreemptEvent(
+ appAttemptId, container, SchedulerEventType.PREEMPT_CONTAINER);
+ rmDispatcher.getEventHandler().handle(event3);
+ // Wait for events to be processed by scheduler dispatcher.
+ Thread.sleep(1000);
+ verify(sched, times(3)).handle(any(SchedulerEvent.class));
+ verify(sched).dropContainerReservation(container);
+ verify(sched).preemptContainer(appAttemptId, container);
+ verify(sched).killContainer(container);
+ } catch (InterruptedException e) {
+ Assert.fail();
+ } finally {
+ schedulerDispatcher.stop();
+ rmDispatcher.stop();
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0e4b0669/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java
index 6c0ed6c..2c0c6d7 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java
@@ -23,8 +23,8 @@ import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.Pro
import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.OBSERVE_ONLY;
import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.TOTAL_PREEMPTION_PER_ROUND;
import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.WAIT_TIME_BEFORE_KILL;
-import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEventType.KILL_CONTAINER;
-import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEventType.PREEMPT_CONTAINER;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType.KILL_CONTAINER;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType.PREEMPT_CONTAINER;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
@@ -58,6 +58,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
@@ -66,7 +67,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsMana
import org.apache.hadoop.yarn.server.resourcemanager.resource.Priority;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEventType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
@@ -76,6 +76,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueu
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ParentQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCapacities;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.OrderingPolicy;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
@@ -104,7 +106,7 @@ public class TestProportionalCapacityPreemptionPolicy {
RMContext rmContext = null;
RMNodeLabelsManager lm = null;
CapacitySchedulerConfiguration schedConf = null;
- EventHandler<ContainerPreemptEvent> mDisp = null;
+ EventHandler<SchedulerEvent> mDisp = null;
ResourceCalculator rc = new DefaultResourceCalculator();
Resource clusterResources = null;
final ApplicationAttemptId appA = ApplicationAttemptId.newInstance(
@@ -164,6 +166,9 @@ public class TestProportionalCapacityPreemptionPolicy {
when(mCS.getRMContext()).thenReturn(rmContext);
when(rmContext.getNodeLabelManager()).thenReturn(lm);
mDisp = mock(EventHandler.class);
+ Dispatcher disp = mock(Dispatcher.class);
+ when(rmContext.getDispatcher()).thenReturn(disp);
+ when(disp.getEventHandler()).thenReturn(mDisp);
rand = new Random();
long seed = rand.nextLong();
System.out.println(name.getMethodName() + " SEED: " + seed);
@@ -866,12 +871,12 @@ public class TestProportionalCapacityPreemptionPolicy {
static class IsPreemptionRequestFor
extends ArgumentMatcher<ContainerPreemptEvent> {
private final ApplicationAttemptId appAttId;
- private final ContainerPreemptEventType type;
+ private final SchedulerEventType type;
IsPreemptionRequestFor(ApplicationAttemptId appAttId) {
this(appAttId, PREEMPT_CONTAINER);
}
IsPreemptionRequestFor(ApplicationAttemptId appAttId,
- ContainerPreemptEventType type) {
+ SchedulerEventType type) {
this.appAttId = appAttId;
this.type = type;
}
@@ -888,7 +893,7 @@ public class TestProportionalCapacityPreemptionPolicy {
ProportionalCapacityPreemptionPolicy buildPolicy(int[][] qData) {
ProportionalCapacityPreemptionPolicy policy =
- new ProportionalCapacityPreemptionPolicy(conf, mDisp, mCS, mClock);
+ new ProportionalCapacityPreemptionPolicy(conf, rmContext, mCS, mClock);
ParentQueue mRoot = buildMockRootQueue(rand, qData);
when(mCS.getRootQueue()).thenReturn(mRoot);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0e4b0669/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyForNodePartitions.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyForNodePartitions.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyForNodePartitions.java
index e13320c..114769c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyForNodePartitions.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyForNodePartitions.java
@@ -50,13 +50,13 @@ import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
@@ -66,6 +66,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueu
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ParentQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCapacities;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.OrderingPolicy;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
@@ -92,7 +93,7 @@ public class TestProportionalCapacityPreemptionPolicyForNodePartitions {
private Configuration conf = null;
private CapacitySchedulerConfiguration csConf = null;
private CapacityScheduler cs = null;
- private EventHandler<ContainerPreemptEvent> mDisp = null;
+ private EventHandler<SchedulerEvent> mDisp = null;
private ProportionalCapacityPreemptionPolicy policy = null;
private Resource clusterResource = null;
@@ -125,11 +126,14 @@ public class TestProportionalCapacityPreemptionPolicyForNodePartitions {
rmContext = mock(RMContext.class);
when(rmContext.getNodeLabelManager()).thenReturn(nlm);
+ Dispatcher disp = mock(Dispatcher.class);
+ when(rmContext.getDispatcher()).thenReturn(disp);
+ when(disp.getEventHandler()).thenReturn(mDisp);
csConf = new CapacitySchedulerConfiguration();
when(cs.getConfiguration()).thenReturn(csConf);
when(cs.getRMContext()).thenReturn(rmContext);
- policy = new ProportionalCapacityPreemptionPolicy(conf, mDisp, cs, mClock);
+ policy = new ProportionalCapacityPreemptionPolicy(conf, rmContext, cs, mClock);
partitionToResource = new HashMap<>();
nodeIdToSchedulerNodes = new HashMap<>();
nameToCSQueues = new HashMap<>();
@@ -828,7 +832,7 @@ public class TestProportionalCapacityPreemptionPolicyForNodePartitions {
when(cs.getClusterResource()).thenReturn(clusterResource);
mockApplications(appsConfig);
- policy = new ProportionalCapacityPreemptionPolicy(conf, mDisp, cs, mClock);
+ policy = new ProportionalCapacityPreemptionPolicy(conf, rmContext, cs, mClock);
}
private void mockContainers(String containersConfig, ApplicationAttemptId attemptId,