You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ka...@apache.org on 2016/01/27 21:29:11 UTC
hadoop git commit: YARN-4462. FairScheduler: Disallow preemption from
a queue. (Tao Jie via kasha)
Repository: hadoop
Updated Branches:
refs/heads/trunk 2e8ab3d46 -> fb238d7e5
YARN-4462. FairScheduler: Disallow preemption from a queue. (Tao Jie via kasha)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/fb238d7e
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/fb238d7e
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/fb238d7e
Branch: refs/heads/trunk
Commit: fb238d7e5dcd96466c8938b13ca7f13cedecb08a
Parents: 2e8ab3d
Author: Karthik Kambatla <ka...@apache.org>
Authored: Wed Jan 27 11:47:29 2016 -0800
Committer: Karthik Kambatla <ka...@apache.org>
Committed: Wed Jan 27 12:29:06 2016 -0800
----------------------------------------------------------------------
hadoop-yarn-project/CHANGES.txt | 2 +
.../scheduler/fair/AllocationConfiguration.java | 11 +-
.../fair/AllocationFileLoaderService.java | 16 +-
.../scheduler/fair/FSParentQueue.java | 8 +
.../resourcemanager/scheduler/fair/FSQueue.java | 11 +-
.../webapp/FairSchedulerPage.java | 1 +
.../webapp/dao/FairSchedulerQueueInfo.java | 7 +
.../scheduler/fair/TestFairScheduler.java | 327 +++++++++++++++++++
8 files changed, 377 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fb238d7e/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index 2fbecdb..2fae034 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -67,6 +67,8 @@ Release 2.9.0 - UNRELEASED
YARN-1856. Added cgroups based memory monitoring for containers as another
alternative to custom memory-monitoring. (Varun Vasudev via vinodkv)
+ YARN-4462. FairScheduler: Disallow preemption from a queue. (Tao Jie via kasha)
+
IMPROVEMENTS
YARN-4072. ApplicationHistoryServer, WebAppProxyServer, NodeManager and
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fb238d7e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java
index bf4eae8..180ae49 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java
@@ -98,6 +98,8 @@ public class AllocationConfiguration extends ReservationSchedulerConfiguration {
// Reservation system configuration
private ReservationQueueConfiguration globalReservationQueueConfig;
+ private final Set<String> nonPreemptableQueues;
+
public AllocationConfiguration(Map<String, Resource> minQueueResources,
Map<String, Resource> maxQueueResources,
Map<String, Integer> queueMaxApps, Map<String, Integer> userMaxApps,
@@ -114,7 +116,8 @@ public class AllocationConfiguration extends ReservationSchedulerConfiguration {
QueuePlacementPolicy placementPolicy,
Map<FSQueueType, Set<String>> configuredQueues,
ReservationQueueConfiguration globalReservationQueueConfig,
- Set<String> reservableQueues) {
+ Set<String> reservableQueues,
+ Set<String> nonPreemptableQueues) {
this.minQueueResources = minQueueResources;
this.maxQueueResources = maxQueueResources;
this.queueMaxApps = queueMaxApps;
@@ -135,6 +138,7 @@ public class AllocationConfiguration extends ReservationSchedulerConfiguration {
this.globalReservationQueueConfig = globalReservationQueueConfig;
this.placementPolicy = placementPolicy;
this.configuredQueues = configuredQueues;
+ this.nonPreemptableQueues = nonPreemptableQueues;
}
public AllocationConfiguration(Configuration conf) {
@@ -161,6 +165,7 @@ public class AllocationConfiguration extends ReservationSchedulerConfiguration {
}
placementPolicy = QueuePlacementPolicy.fromConfiguration(conf,
configuredQueues);
+ nonPreemptableQueues = new HashSet<String>();
}
/**
@@ -210,6 +215,10 @@ public class AllocationConfiguration extends ReservationSchedulerConfiguration {
-1f : fairSharePreemptionThreshold;
}
+ public boolean isPreemptable(String queueName) {
+ return !nonPreemptableQueues.contains(queueName);
+ }
+
public ResourceWeights getQueueWeight(String queue) {
ResourceWeights weight = queueWeights.get(queue);
return (weight == null) ? ResourceWeights.NEUTRAL : weight;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fb238d7e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java
index 9a31be3..d6012af 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java
@@ -224,6 +224,7 @@ public class AllocationFileLoaderService extends AbstractService {
Map<String, Map<QueueACL, AccessControlList>> queueAcls =
new HashMap<String, Map<QueueACL, AccessControlList>>();
Set<String> reservableQueues = new HashSet<String>();
+ Set<String> nonPreemptableQueues = new HashSet<String>();
int userMaxAppsDefault = Integer.MAX_VALUE;
int queueMaxAppsDefault = Integer.MAX_VALUE;
Resource queueMaxResourcesDefault = Resources.unbounded();
@@ -360,7 +361,7 @@ public class AllocationFileLoaderService extends AbstractService {
queueMaxApps, userMaxApps, queueMaxAMShares, queueWeights,
queuePolicies, minSharePreemptionTimeouts, fairSharePreemptionTimeouts,
fairSharePreemptionThresholds, queueAcls, configuredQueues,
- reservableQueues);
+ reservableQueues, nonPreemptableQueues);
}
// Load placement policy and pass it configured queues
@@ -409,7 +410,7 @@ public class AllocationFileLoaderService extends AbstractService {
defaultSchedPolicy, minSharePreemptionTimeouts,
fairSharePreemptionTimeouts, fairSharePreemptionThresholds, queueAcls,
newPlacementPolicy, configuredQueues, globalReservationQueueConfig,
- reservableQueues);
+ reservableQueues, nonPreemptableQueues);
lastSuccessfulReload = clock.getTime();
lastReloadAttemptFailed = false;
@@ -431,7 +432,8 @@ public class AllocationFileLoaderService extends AbstractService {
Map<String, Float> fairSharePreemptionThresholds,
Map<String, Map<QueueACL, AccessControlList>> queueAcls,
Map<FSQueueType, Set<String>> configuredQueues,
- Set<String> reservableQueues)
+ Set<String> reservableQueues,
+ Set<String> nonPreemptableQueues)
throws AllocationConfigurationException {
String queueName = element.getAttribute("name").trim();
@@ -508,13 +510,19 @@ public class AllocationFileLoaderService extends AbstractService {
isLeaf = false;
reservableQueues.add(queueName);
configuredQueues.get(FSQueueType.PARENT).add(queueName);
+ } else if ("allowPreemptionFrom".equals(field.getTagName())) {
+ String text = ((Text)field.getFirstChild()).getData().trim();
+ if (!Boolean.parseBoolean(text)) {
+ nonPreemptableQueues.add(queueName);
+ }
} else if ("queue".endsWith(field.getTagName()) ||
"pool".equals(field.getTagName())) {
loadQueue(queueName, field, minQueueResources, maxQueueResources,
queueMaxApps, userMaxApps, queueMaxAMShares, queueWeights,
queuePolicies, minSharePreemptionTimeouts,
fairSharePreemptionTimeouts, fairSharePreemptionThresholds,
- queueAcls, configuredQueues, reservableQueues);
+ queueAcls, configuredQueues, reservableQueues,
+ nonPreemptableQueues);
isLeaf = false;
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fb238d7e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java
index febe050..a028422 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java
@@ -260,6 +260,14 @@ public class FSParentQueue extends FSQueue {
readLock.lock();
try {
for (FSQueue queue : childQueues) {
+ // Skip selection for non-preemptable queue
+ if (!queue.isPreemptable()) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("skipping from queue=" + getName()
+ + " because it's a non-preemptable queue");
+ }
+ continue;
+ }
if (candidateQueue == null ||
comparator.compare(queue, candidateQueue) > 0) {
candidateQueue = queue;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fb238d7e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java
index 713bdca..f82411d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java
@@ -62,6 +62,7 @@ public abstract class FSQueue implements Queue, Schedulable {
private long fairSharePreemptionTimeout = Long.MAX_VALUE;
private long minSharePreemptionTimeout = Long.MAX_VALUE;
private float fairSharePreemptionThreshold = 0.5f;
+ private boolean preemptable = true;
public FSQueue(String name, FairScheduler scheduler, FSParentQueue parent) {
this.name = name;
@@ -235,6 +236,10 @@ public abstract class FSQueue implements Queue, Schedulable {
this.fairSharePreemptionThreshold = fairSharePreemptionThreshold;
}
+ public boolean isPreemptable() {
+ return preemptable;
+ }
+
/**
* Recomputes the shares for all child queues and applications based on this
* queue's current share
@@ -242,7 +247,8 @@ public abstract class FSQueue implements Queue, Schedulable {
public abstract void recomputeShares();
/**
- * Update the min/fair share preemption timeouts and threshold for this queue.
+ * Update the min/fair share preemption timeouts, threshold and preemption
+ * disabled flag for this queue.
*/
public void updatePreemptionVariables() {
// For min share timeout
@@ -263,6 +269,9 @@ public abstract class FSQueue implements Queue, Schedulable {
if (fairSharePreemptionThreshold < 0 && parent != null) {
fairSharePreemptionThreshold = parent.getFairSharePreemptionThreshold();
}
+ // For option whether allow preemption from this queue
+ preemptable = scheduler.getAllocationConfiguration()
+ .isPreemptable(getName());
}
/**
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fb238d7e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/FairSchedulerPage.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/FairSchedulerPage.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/FairSchedulerPage.java
index 5ff9422..689622f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/FairSchedulerPage.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/FairSchedulerPage.java
@@ -81,6 +81,7 @@ public class FairSchedulerPage extends RmView {
}
ri._(STEADY_FAIR_SHARE + ":", qinfo.getSteadyFairShare().toString());
ri._(INSTANTANEOUS_FAIR_SHARE + ":", qinfo.getFairShare().toString());
+ ri._("Preemptable:", qinfo.isPreemptable());
html._(InfoBlock.class);
// clear the info contents so this queue's info doesn't accumulate into another queue's info
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fb238d7e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/FairSchedulerQueueInfo.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/FairSchedulerQueueInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/FairSchedulerQueueInfo.java
index ee37f18..e02df65 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/FairSchedulerQueueInfo.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/FairSchedulerQueueInfo.java
@@ -65,6 +65,8 @@ public class FairSchedulerQueueInfo {
private String queueName;
private String schedulingPolicy;
+ private boolean preemptable;
+
private FairSchedulerQueueInfoList childQueues;
public FairSchedulerQueueInfo() {
@@ -108,6 +110,7 @@ public class FairSchedulerQueueInfo {
return;
}
+ preemptable = queue.isPreemptable();
childQueues = getChildQueues(queue, scheduler);
}
@@ -228,4 +231,8 @@ public class FairSchedulerQueueInfo {
return childQueues != null ? childQueues.getQueueInfoList() :
new ArrayList<FairSchedulerQueueInfo>();
}
+
+ public boolean isPreemptable() {
+ return preemptable;
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fb238d7e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
index 1b1418a..fac28b7 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
@@ -2493,6 +2493,333 @@ public class TestFairScheduler extends FairSchedulerTestBase {
}
@Test
+ /**
+ * Tests the decision to preempt tasks respect to non-preemptable queues
+ * 1, Queues as follow:
+ * queueA(non-preemptable)
+ * queueB(preemptable)
+ * parentQueue(non-preemptable)
+ * --queueC(preemptable)
+ * queueD(preemptable)
+ * 2, Submit request to queueA, queueB, queueC, and all of them are over MinShare
+ * 3, Now all resource are occupied
+ * 4, Submit request to queueD, and need to preempt resource from other queues
+ * 5, Only preemptable queue(queueB) would be preempted.
+ */
+ public void testPreemptionDecisionWithNonPreemptableQueue() throws Exception {
+ conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
+ ControlledClock clock = new ControlledClock();
+ scheduler.setClock(clock);
+
+ PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
+ out.println("<?xml version=\"1.0\"?>");
+ out.println("<allocations>");
+ out.println("<queue name=\"default\">");
+ out.println("<maxResources>0mb,0vcores</maxResources>");
+ out.println("</queue>");
+ out.println("<queue name=\"queueA\">");
+ out.println("<weight>.25</weight>");
+ out.println("<minResources>1024mb,0vcores</minResources>");
+ out.println("<allowPreemptionFrom>false</allowPreemptionFrom>");
+ out.println("</queue>");
+ out.println("<queue name=\"queueB\">");
+ out.println("<weight>.25</weight>");
+ out.println("<minResources>1024mb,0vcores</minResources>");
+ out.println("</queue>");
+ out.println("<queue name=\"parentQueue\">");
+ out.println("<allowPreemptionFrom>false</allowPreemptionFrom>");
+ out.println("<queue name=\"queueC\">");
+ out.println("<weight>.25</weight>");
+ out.println("<minResources>1024mb,0vcores</minResources>");
+ out.println("</queue>");
+ out.println("</queue>");
+ out.println("<queue name=\"queueD\">");
+ out.println("<weight>.25</weight>");
+ out.println("<minResources>2048mb,0vcores</minResources>");
+ out.println("</queue>");
+ out.println("<defaultMinSharePreemptionTimeout>5</defaultMinSharePreemptionTimeout>");
+ out.println("<defaultFairSharePreemptionTimeout>10</defaultFairSharePreemptionTimeout>");
+ out.println("<defaultFairSharePreemptionThreshold>.5</defaultFairSharePreemptionThreshold>");
+ out.println("</allocations>");
+ out.close();
+
+ scheduler.init(conf);
+ scheduler.start();
+ scheduler.reinitialize(conf, resourceManager.getRMContext());
+
+ // Create four nodes(3G each)
+ RMNode node1 =
+ MockNodes.newNodeInfo(1, Resources.createResource(3 * 1024, 3), 1,
+ "127.0.0.1");
+ NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
+ scheduler.handle(nodeEvent1);
+
+ RMNode node2 =
+ MockNodes.newNodeInfo(1, Resources.createResource(3 * 1024, 3), 2,
+ "127.0.0.2");
+ NodeAddedSchedulerEvent nodeEvent2 = new NodeAddedSchedulerEvent(node2);
+ scheduler.handle(nodeEvent2);
+
+ RMNode node3 =
+ MockNodes.newNodeInfo(1, Resources.createResource(3 * 1024, 3), 3,
+ "127.0.0.3");
+ NodeAddedSchedulerEvent nodeEvent3 = new NodeAddedSchedulerEvent(node3);
+ scheduler.handle(nodeEvent3);
+
+ RMNode node4 =
+ MockNodes.newNodeInfo(1, Resources.createResource(3 * 1024, 3), 4,
+ "127.0.0.4");
+ NodeAddedSchedulerEvent nodeEvent4 = new NodeAddedSchedulerEvent(node4);
+ scheduler.handle(nodeEvent4);
+
+ // Submit apps to queueA, queueB, queueC,
+ // now all resource of the cluster is occupied
+ ApplicationAttemptId app1 =
+ createSchedulingRequest(1 * 1024, "queueA", "user1", 4, 1);
+ ApplicationAttemptId app2 =
+ createSchedulingRequest(1 * 1024, "queueB", "user1", 4, 2);
+ ApplicationAttemptId app3 =
+ createSchedulingRequest(1 * 1024, "parentQueue.queueC", "user1", 4, 3);
+
+ scheduler.update();
+
+ // Sufficient node check-ins to fully schedule containers
+ for (int i = 0; i < 3; i++) {
+ NodeUpdateSchedulerEvent nodeUpdate1 = new NodeUpdateSchedulerEvent(node1);
+ scheduler.handle(nodeUpdate1);
+
+ NodeUpdateSchedulerEvent nodeUpdate2 = new NodeUpdateSchedulerEvent(node2);
+ scheduler.handle(nodeUpdate2);
+
+ NodeUpdateSchedulerEvent nodeUpdate3 = new NodeUpdateSchedulerEvent(node3);
+ scheduler.handle(nodeUpdate3);
+
+ NodeUpdateSchedulerEvent nodeUpdate4 = new NodeUpdateSchedulerEvent(node4);
+ scheduler.handle(nodeUpdate4);
+ }
+
+ assertEquals(4, scheduler.getSchedulerApp(app1).getLiveContainers().size());
+ assertEquals(4, scheduler.getSchedulerApp(app2).getLiveContainers().size());
+ assertEquals(4, scheduler.getSchedulerApp(app3).getLiveContainers().size());
+
+ // Now new requests arrive from queues D
+ ApplicationAttemptId app4 =
+ createSchedulingRequest(1 * 1024, "queueD", "user1", 4, 1);
+ scheduler.update();
+ FSLeafQueue schedD =
+ scheduler.getQueueManager().getLeafQueue("queueD", true);
+
+ // After minSharePreemptionTime has passed, 2G resource should preempted from
+ // queueB to queueD
+ clock.tickSec(6);
+ assertEquals(2048,
+ scheduler.resourceDeficit(schedD, clock.getTime()).getMemory());
+
+ scheduler.preemptResources(Resources.createResource(2 * 1024));
+ // now only app2 is selected to be preempted
+ assertTrue("App2 should have container to be preempted",
+ !Collections.disjoint(
+ scheduler.getSchedulerApp(app2).getLiveContainers(),
+ scheduler.getSchedulerApp(app2).getPreemptionContainers()));
+ assertTrue("App1 should not have container to be preempted",
+ Collections.disjoint(
+ scheduler.getSchedulerApp(app1).getLiveContainers(),
+ scheduler.getSchedulerApp(app1).getPreemptionContainers()));
+ assertTrue("App3 should not have container to be preempted",
+ Collections.disjoint(
+ scheduler.getSchedulerApp(app3).getLiveContainers(),
+ scheduler.getSchedulerApp(app3).getPreemptionContainers()));
+ // Pretend 20 seconds have passed
+ clock.tickSec(20);
+ scheduler.preemptResources(Resources.createResource(2 * 1024));
+ for (int i = 0; i < 3; i++) {
+ NodeUpdateSchedulerEvent nodeUpdate1 = new NodeUpdateSchedulerEvent(node1);
+ scheduler.handle(nodeUpdate1);
+
+ NodeUpdateSchedulerEvent nodeUpdate2 = new NodeUpdateSchedulerEvent(node2);
+ scheduler.handle(nodeUpdate2);
+
+ NodeUpdateSchedulerEvent nodeUpdate3 = new NodeUpdateSchedulerEvent(node3);
+ scheduler.handle(nodeUpdate3);
+
+ NodeUpdateSchedulerEvent nodeUpdate4 = new NodeUpdateSchedulerEvent(node4);
+ scheduler.handle(nodeUpdate4);
+ }
+ // after preemption
+ assertEquals(4, scheduler.getSchedulerApp(app1).getLiveContainers().size());
+ assertEquals(2, scheduler.getSchedulerApp(app2).getLiveContainers().size());
+ assertEquals(4, scheduler.getSchedulerApp(app3).getLiveContainers().size());
+ assertEquals(2, scheduler.getSchedulerApp(app4).getLiveContainers().size());
+ }
+
+ @Test
+ /**
+ * Tests the decision to preempt tasks when allowPreemptionFrom is set false on
+ * all queues.
+ * Then none of them would be preempted actually.
+ * 1, Queues as follow:
+ * queueA(non-preemptable)
+ * queueB(non-preemptable)
+ * parentQueue(non-preemptable)
+ * --queueC(preemptable)
+ * parentQueue(preemptable)
+ * --queueD(non-preemptable)
+ * 2, Submit request to queueB, queueC, queueD, and all of them are over MinShare
+ * 3, Now all resource are occupied
+ * 4, Submit request to queueA, and need to preempt resource from other queues
+ * 5, None of queues would be preempted.
+ */
+ public void testPreemptionDecisionWhenPreemptionDisabledOnAllQueues()
+ throws Exception {
+ conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
+ ControlledClock clock = new ControlledClock();
+ scheduler.setClock(clock);
+
+ PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
+ out.println("<?xml version=\"1.0\"?>");
+ out.println("<allocations>");
+ out.println("<queue name=\"default\">");
+ out.println("<maxResources>0mb,0vcores</maxResources>");
+ out.println("</queue>");
+ out.println("<queue name=\"queueA\">");
+ out.println("<weight>.25</weight>");
+ out.println("<minResources>2048mb,0vcores</minResources>");
+ out.println("<allowPreemptionFrom>false</allowPreemptionFrom>");
+ out.println("</queue>");
+ out.println("<queue name=\"queueB\">");
+ out.println("<weight>.25</weight>");
+ out.println("<minResources>1024mb,0vcores</minResources>");
+ out.println("<allowPreemptionFrom>false</allowPreemptionFrom>");
+ out.println("</queue>");
+ out.println("<queue name=\"parentQueue1\">");
+ out.println("<allowPreemptionFrom>false</allowPreemptionFrom>");
+ out.println("<queue name=\"queueC\">");
+ out.println("<weight>.25</weight>");
+ out.println("<minResources>1024mb,0vcores</minResources>");
+ out.println("</queue>");
+ out.println("</queue>");
+ out.println("<queue name=\"parentQueue2\">");
+ out.println("<queue name=\"queueD\">");
+ out.println("<weight>.25</weight>");
+ out.println("<minResources>1024mb,0vcores</minResources>");
+ out.println("<allowPreemptionFrom>false</allowPreemptionFrom>");
+ out.println("</queue>");
+ out.println("</queue>");
+ out.println("<defaultMinSharePreemptionTimeout>5</defaultMinSharePreemptionTimeout>");
+ out.println("<defaultFairSharePreemptionTimeout>10</defaultFairSharePreemptionTimeout>");
+ out.println("<defaultFairSharePreemptionThreshold>.5</defaultFairSharePreemptionThreshold>");
+ out.println("</allocations>");
+ out.close();
+
+ scheduler.init(conf);
+ scheduler.start();
+ scheduler.reinitialize(conf, resourceManager.getRMContext());
+
+ // Create four nodes(3G each)
+ RMNode node1 =
+ MockNodes.newNodeInfo(1, Resources.createResource(3 * 1024, 3), 1,
+ "127.0.0.1");
+ NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
+ scheduler.handle(nodeEvent1);
+
+ RMNode node2 =
+ MockNodes.newNodeInfo(1, Resources.createResource(3 * 1024, 3), 2,
+ "127.0.0.2");
+ NodeAddedSchedulerEvent nodeEvent2 = new NodeAddedSchedulerEvent(node2);
+ scheduler.handle(nodeEvent2);
+
+ RMNode node3 =
+ MockNodes.newNodeInfo(1, Resources.createResource(3 * 1024, 3), 3,
+ "127.0.0.3");
+ NodeAddedSchedulerEvent nodeEvent3 = new NodeAddedSchedulerEvent(node3);
+ scheduler.handle(nodeEvent3);
+
+ RMNode node4 =
+ MockNodes.newNodeInfo(1, Resources.createResource(3 * 1024, 3), 4,
+ "127.0.0.4");
+ NodeAddedSchedulerEvent nodeEvent4 = new NodeAddedSchedulerEvent(node4);
+ scheduler.handle(nodeEvent4);
+
+ // Submit apps to queueB, queueC, queueD
+ // now all resource of the cluster is occupied
+
+ ApplicationAttemptId app1 =
+ createSchedulingRequest(1 * 1024, "queueB", "user1", 4, 1);
+ ApplicationAttemptId app2 =
+ createSchedulingRequest(1 * 1024, "parentQueue1.queueC", "user1", 4, 2);
+ ApplicationAttemptId app3 =
+ createSchedulingRequest(1 * 1024, "parentQueue2.queueD", "user1", 4, 3);
+ scheduler.update();
+
+ // Sufficient node check-ins to fully schedule containers
+ for (int i = 0; i < 3; i++) {
+ NodeUpdateSchedulerEvent nodeUpdate1 = new NodeUpdateSchedulerEvent(node1);
+ scheduler.handle(nodeUpdate1);
+
+ NodeUpdateSchedulerEvent nodeUpdate2 = new NodeUpdateSchedulerEvent(node2);
+ scheduler.handle(nodeUpdate2);
+
+ NodeUpdateSchedulerEvent nodeUpdate3 = new NodeUpdateSchedulerEvent(node3);
+ scheduler.handle(nodeUpdate3);
+
+ NodeUpdateSchedulerEvent nodeUpdate4 = new NodeUpdateSchedulerEvent(node4);
+ scheduler.handle(nodeUpdate4);
+ }
+
+ assertEquals(4, scheduler.getSchedulerApp(app1).getLiveContainers().size());
+ assertEquals(4, scheduler.getSchedulerApp(app2).getLiveContainers().size());
+ assertEquals(4, scheduler.getSchedulerApp(app3).getLiveContainers().size());
+
+ // Now new requests arrive from queues A
+ ApplicationAttemptId app4 =
+ createSchedulingRequest(1 * 1024, "queueA", "user1", 4, 1);
+ scheduler.update();
+ FSLeafQueue schedA =
+ scheduler.getQueueManager().getLeafQueue("queueA", true);
+
+ // After minSharePreemptionTime has passed, resource deficit is 2G
+ clock.tickSec(6);
+ assertEquals(2048,
+ scheduler.resourceDeficit(schedA, clock.getTime()).getMemory());
+
+ scheduler.preemptResources(Resources.createResource(2 * 1024));
+ // now none app is selected to be preempted
+ assertTrue("App1 should have container to be preempted",
+ Collections.disjoint(
+ scheduler.getSchedulerApp(app1).getLiveContainers(),
+ scheduler.getSchedulerApp(app1).getPreemptionContainers()));
+ assertTrue("App2 should not have container to be preempted",
+ Collections.disjoint(
+ scheduler.getSchedulerApp(app2).getLiveContainers(),
+ scheduler.getSchedulerApp(app2).getPreemptionContainers()));
+ assertTrue("App3 should not have container to be preempted",
+ Collections.disjoint(
+ scheduler.getSchedulerApp(app3).getLiveContainers(),
+ scheduler.getSchedulerApp(app3).getPreemptionContainers()));
+ // Pretend 20 seconds have passed
+ clock.tickSec(20);
+ scheduler.preemptResources(Resources.createResource(2 * 1024));
+ for (int i = 0; i < 3; i++) {
+ NodeUpdateSchedulerEvent nodeUpdate1 = new NodeUpdateSchedulerEvent(node1);
+ scheduler.handle(nodeUpdate1);
+
+ NodeUpdateSchedulerEvent nodeUpdate2 = new NodeUpdateSchedulerEvent(node2);
+ scheduler.handle(nodeUpdate2);
+
+ NodeUpdateSchedulerEvent nodeUpdate3 = new NodeUpdateSchedulerEvent(node3);
+ scheduler.handle(nodeUpdate3);
+
+ NodeUpdateSchedulerEvent nodeUpdate4 = new NodeUpdateSchedulerEvent(node4);
+ scheduler.handle(nodeUpdate4);
+ }
+ // after preemption
+ assertEquals(4, scheduler.getSchedulerApp(app1).getLiveContainers().size());
+ assertEquals(4, scheduler.getSchedulerApp(app2).getLiveContainers().size());
+ assertEquals(4, scheduler.getSchedulerApp(app3).getLiveContainers().size());
+ assertEquals(0, scheduler.getSchedulerApp(app4).getLiveContainers().size());
+ }
+
+ @Test
public void testBackwardsCompatiblePreemptionConfiguration() throws Exception {
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);