You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by su...@apache.org on 2017/02/24 19:34:32 UTC
[05/50] [abbrv] hadoop git commit: YARN-6210. FairScheduler: Node
reservations can interfere with preemption. (kasha)
YARN-6210. FairScheduler: Node reservations can interfere with preemption. (kasha)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/718ad9f6
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/718ad9f6
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/718ad9f6
Branch: refs/heads/YARN-2915
Commit: 718ad9f6ee93d4145f2bb19b7582ce4e1174feaf
Parents: 732ee6f
Author: Karthik Kambatla <ka...@cloudera.com>
Authored: Wed Feb 22 15:45:45 2017 -0800
Committer: Karthik Kambatla <ka...@cloudera.com>
Committed: Wed Feb 22 15:46:07 2017 -0800
----------------------------------------------------------------------
.../resource/DefaultResourceCalculator.java | 3 +-
.../resource/DominantResourceCalculator.java | 13 +-
.../yarn/util/resource/ResourceCalculator.java | 32 ++++-
.../scheduler/fair/FSAppAttempt.java | 61 ++++++---
.../DominantResourceFairnessPolicy.java | 8 +-
.../fair/policies/FairSharePolicy.java | 3 +-
.../scheduler/fair/TestFairScheduler.java | 127 ++++++++-----------
.../fair/TestFairSchedulerPreemption.java | 44 +++++--
8 files changed, 180 insertions(+), 111 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/718ad9f6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DefaultResourceCalculator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DefaultResourceCalculator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DefaultResourceCalculator.java
index 42c45ad..ef7229c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DefaultResourceCalculator.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DefaultResourceCalculator.java
@@ -30,7 +30,8 @@ public class DefaultResourceCalculator extends ResourceCalculator {
LogFactory.getLog(DefaultResourceCalculator.class);
@Override
- public int compare(Resource unused, Resource lhs, Resource rhs) {
+ public int compare(Resource unused, Resource lhs, Resource rhs,
+ boolean singleType) {
// Only consider memory
return Long.compare(lhs.getMemorySize(), rhs.getMemorySize());
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/718ad9f6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DominantResourceCalculator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DominantResourceCalculator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DominantResourceCalculator.java
index 9f1c8d7..032aa02 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DominantResourceCalculator.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DominantResourceCalculator.java
@@ -51,17 +51,18 @@ public class DominantResourceCalculator extends ResourceCalculator {
LogFactory.getLog(DominantResourceCalculator.class);
@Override
- public int compare(Resource clusterResource, Resource lhs, Resource rhs) {
+ public int compare(Resource clusterResource, Resource lhs, Resource rhs,
+ boolean singleType) {
if (lhs.equals(rhs)) {
return 0;
}
if (isInvalidDivisor(clusterResource)) {
- if ((lhs.getMemorySize() < rhs.getMemorySize() && lhs.getVirtualCores() > rhs
- .getVirtualCores())
- || (lhs.getMemorySize() > rhs.getMemorySize() && lhs.getVirtualCores() < rhs
- .getVirtualCores())) {
+ if ((lhs.getMemorySize() < rhs.getMemorySize() &&
+ lhs.getVirtualCores() > rhs.getVirtualCores()) ||
+ (lhs.getMemorySize() > rhs.getMemorySize() &&
+ lhs.getVirtualCores() < rhs.getVirtualCores())) {
return 0;
} else if (lhs.getMemorySize() > rhs.getMemorySize()
|| lhs.getVirtualCores() > rhs.getVirtualCores()) {
@@ -79,7 +80,7 @@ public class DominantResourceCalculator extends ResourceCalculator {
return -1;
} else if (l > r) {
return 1;
- } else {
+ } else if (!singleType) {
l = getResourceAsValue(clusterResource, lhs, false);
r = getResourceAsValue(clusterResource, rhs, false);
if (l < r) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/718ad9f6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/ResourceCalculator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/ResourceCalculator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/ResourceCalculator.java
index 50ce04c..a2f85b3 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/ResourceCalculator.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/ResourceCalculator.java
@@ -28,8 +28,36 @@ import org.apache.hadoop.yarn.api.records.Resource;
@Unstable
public abstract class ResourceCalculator {
- public abstract int
- compare(Resource clusterResource, Resource lhs, Resource rhs);
+ /**
+ * On a cluster with capacity {@code clusterResource}, compare {@code lhs}
+ * and {@code rhs}. Consider all resources unless {@code singleType} is set
+ * to true. When {@code singleType} is set to true, consider only one
+ * resource as per the {@link ResourceCalculator} implementation; the
+ * {@link DefaultResourceCalculator} considers memory and
+ * {@link DominantResourceCalculator} considers the dominant resource.
+ *
+ * @param clusterResource cluster capacity
+ * @param lhs First {@link Resource} to compare
+ * @param rhs Second {@link Resource} to compare
+ * @param singleType Whether to consider a single resource type or all
+ * resource types
+ * @return -1 if {@code lhs} is smaller, 0 if equal and 1 if it is larger
+ */
+ public abstract int compare(
+ Resource clusterResource, Resource lhs, Resource rhs, boolean singleType);
+
+ /**
+ * On a cluster with capacity {@code clusterResource}, compare {@code lhs}
+ * and {@code rhs} considering all resources.
+ *
+ * @param clusterResource cluster capacity
+ * @param lhs First {@link Resource} to compare
+ * @param rhs Second {@link Resource} to compare
+ * @return -1 if {@code lhs} is smaller, 0 if equal and 1 if it is larger
+ */
+ public int compare(Resource clusterResource, Resource lhs, Resource rhs) {
+ return compare(clusterResource, lhs, rhs, false);
+ }
public static int divideAndCeil(int a, int b) {
if (b == 0) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/718ad9f6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java
index 6ed0660..6c61b45 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java
@@ -18,7 +18,6 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
-import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
@@ -605,8 +604,7 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
Resource usageAfterPreemption = Resources.subtract(
getResourceUsage(), container.getAllocatedResource());
- return !Resources.lessThan(fsQueue.getPolicy().getResourceCalculator(),
- scheduler.getClusterResource(), usageAfterPreemption, getFairShare());
+ return !isUsageBelowShare(usageAfterPreemption, getFairShare());
}
/**
@@ -833,9 +831,9 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
}
// The desired container won't fit here, so reserve
- if (isReservable(capability) && reserve(
- pendingAsk.getPerAllocationResource(), node, reservedContainer, type,
- schedulerKey)) {
+ if (isReservable(capability) &&
+ reserve(pendingAsk.getPerAllocationResource(), node, reservedContainer,
+ type, schedulerKey)) {
if (isWaitingForAMContainer()) {
updateAMDiagnosticMsg(capability,
" exceed the available resources of the node and the request is"
@@ -857,8 +855,11 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
}
private boolean isReservable(Resource capacity) {
- return scheduler.isAtLeastReservationThreshold(
- getQueue().getPolicy().getResourceCalculator(), capacity);
+ // Reserve only when the app is starved and the requested container size
+ // is larger than the configured threshold
+ return isStarved() &&
+ scheduler.isAtLeastReservationThreshold(
+ getQueue().getPolicy().getResourceCalculator(), capacity);
}
/**
@@ -1089,34 +1090,51 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
* @return freshly computed fairshare starvation
*/
Resource fairShareStarvation() {
+ long now = scheduler.getClock().getTime();
Resource threshold = Resources.multiply(
getFairShare(), fsQueue.getFairSharePreemptionThreshold());
- Resource starvation = Resources.componentwiseMin(threshold, demand);
- Resources.subtractFromNonNegative(starvation, getResourceUsage());
+ Resource fairDemand = Resources.componentwiseMin(threshold, demand);
- long now = scheduler.getClock().getTime();
- boolean starved = !Resources.isNone(starvation);
+ // Check if the queue is starved for fairshare
+ boolean starved = isUsageBelowShare(getResourceUsage(), fairDemand);
if (!starved) {
lastTimeAtFairShare = now;
}
- if (starved &&
- (now - lastTimeAtFairShare > fsQueue.getFairSharePreemptionTimeout())) {
- this.fairshareStarvation = starvation;
+ if (!starved ||
+ now - lastTimeAtFairShare < fsQueue.getFairSharePreemptionTimeout()) {
+ fairshareStarvation = Resources.none();
} else {
- this.fairshareStarvation = Resources.none();
+ // The app has been starved for longer than preemption-timeout.
+ fairshareStarvation =
+ Resources.subtractFromNonNegative(fairDemand, getResourceUsage());
}
- return this.fairshareStarvation;
+ return fairshareStarvation;
+ }
+
+ /**
+ * Helper method that checks if {@code usage} is strictly less than
+ * {@code share}.
+ */
+ private boolean isUsageBelowShare(Resource usage, Resource share) {
+ return fsQueue.getPolicy().getResourceCalculator().compare(
+ scheduler.getClusterResource(), usage, share, true) < 0;
}
/**
* Helper method that captures if this app is identified to be starved.
* @return true if the app is starved for fairshare, false otherwise
*/
- @VisibleForTesting
boolean isStarvedForFairShare() {
- return !Resources.isNone(fairshareStarvation);
+ return isUsageBelowShare(getResourceUsage(), getFairShare());
+ }
+
+ /**
+ * Is application starved for fairshare or minshare
+ */
+ private boolean isStarved() {
+ return isStarvedForFairShare() || !Resources.isNone(minshareStarvation);
}
/**
@@ -1333,6 +1351,11 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
}
@Override
+ public String toString() {
+ return getApplicationAttemptId() + " Alloc: " + getCurrentConsumption();
+ }
+
+ @Override
public boolean isPreemptable() {
return getQueue().isPreemptable();
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/718ad9f6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/DominantResourceFairnessPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/DominantResourceFairnessPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/DominantResourceFairnessPolicy.java
index 6f04cb7..369b8a1 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/DominantResourceFairnessPolicy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/DominantResourceFairnessPolicy.java
@@ -155,8 +155,12 @@ public class DominantResourceFairnessPolicy extends SchedulingPolicy {
resourceOrder1, resourceOrder2);
}
if (res == 0) {
- // Apps are tied in fairness ratio. Break the tie by submit time.
- res = (int)(s1.getStartTime() - s2.getStartTime());
+ // Apps are tied in fairness ratio. Break the tie by submit time and job
+ // name to get a deterministic ordering, which is useful for unit tests.
+ res = (int) Math.signum(s1.getStartTime() - s2.getStartTime());
+ if (res == 0) {
+ res = s1.getName().compareTo(s2.getName());
+ }
}
return res;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/718ad9f6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java
index 9036a03..f8cdb45 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java
@@ -131,8 +131,9 @@ public class FairSharePolicy extends SchedulingPolicy {
// Apps are tied in fairness ratio. Break the tie by submit time and job
// name to get a deterministic ordering, which is useful for unit tests.
res = (int) Math.signum(s1.getStartTime() - s2.getStartTime());
- if (res == 0)
+ if (res == 0) {
res = s1.getName().compareTo(s2.getName());
+ }
}
return res;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/718ad9f6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
index 0c3a614..4def53f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
@@ -119,6 +119,7 @@ import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
+import org.mockito.Mock;
import org.mockito.Mockito;
import org.xml.sax.SAXException;
@@ -2627,71 +2628,57 @@ public class TestFairScheduler extends FairSchedulerTestBase {
assertEquals(1, scheduler.getSchedulerApp(attId4).getLiveContainers().size());
}
+ /**
+ * Reserve at a lower priority and verify the lower priority request gets
+ * allocated
+ */
@Test (timeout = 5000)
- public void testReservationWhileMultiplePriorities() throws IOException {
+ public void testReservationWithMultiplePriorities() throws IOException {
scheduler.init(conf);
scheduler.start();
scheduler.reinitialize(conf, resourceManager.getRMContext());
// Add a node
- RMNode node1 =
- MockNodes
- .newNodeInfo(1, Resources.createResource(1024, 4), 1, "127.0.0.1");
+ RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(2048, 2));
NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
scheduler.handle(nodeEvent1);
-
- ApplicationAttemptId attId = createSchedulingRequest(1024, 4, "queue1",
- "user1", 1, 2);
- scheduler.update();
NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node1);
- scheduler.handle(updateEvent);
-
- FSAppAttempt app = scheduler.getSchedulerApp(attId);
- assertEquals(1, app.getLiveContainers().size());
-
- ContainerId containerId = scheduler.getSchedulerApp(attId)
- .getLiveContainers().iterator().next().getContainerId();
- // Cause reservation to be created
- createSchedulingRequestExistingApplication(1024, 4, 2, attId);
+ // Create first app and take up half resources so the second app that asks
+ // for the entire node won't have enough.
+ FSAppAttempt app1 = scheduler.getSchedulerApp(
+ createSchedulingRequest(1024, 1, "queue", "user", 1));
scheduler.update();
scheduler.handle(updateEvent);
+ assertEquals("Basic allocation failed", 1, app1.getLiveContainers().size());
- assertEquals(1, app.getLiveContainers().size());
- assertEquals(0, scheduler.getRootQueueMetrics().getAvailableMB());
- assertEquals(0, scheduler.getRootQueueMetrics().getAvailableVirtualCores());
-
- // Create request at higher priority
- createSchedulingRequestExistingApplication(1024, 4, 1, attId);
+ // Create another app and reserve at a lower priority first
+ ApplicationAttemptId attId =
+ createSchedulingRequest(2048, 2, "queue1", "user1", 1, 2);
+ FSAppAttempt app2 = scheduler.getSchedulerApp(attId);
scheduler.update();
scheduler.handle(updateEvent);
-
- assertEquals(1, app.getLiveContainers().size());
- // Reserved container should still be at lower priority
- for (RMContainer container : app.getReservedContainers()) {
- assertEquals(2,
- container.getReservedSchedulerKey().getPriority().getPriority());
- }
-
- // Complete container
- scheduler.allocate(attId, new ArrayList<ResourceRequest>(),
+ assertEquals("Reservation at lower priority failed",
+ 1, app2.getReservedContainers().size());
+
+ // Request container on the second app at a higher priority
+ createSchedulingRequestExistingApplication(2048, 2, 1, attId);
+
+ // Complete the first container so we can trigger allocation for app2
+ ContainerId containerId =
+ app1.getLiveContainers().iterator().next().getContainerId();
+ scheduler.allocate(app1.getApplicationAttemptId(), new ArrayList<>(),
Arrays.asList(containerId), null, null, NULL_UPDATE_REQUESTS);
- assertEquals(1024, scheduler.getRootQueueMetrics().getAvailableMB());
- assertEquals(4, scheduler.getRootQueueMetrics().getAvailableVirtualCores());
-
- // Schedule at opening
- scheduler.update();
+
+ // Trigger allocation for app2
scheduler.handle(updateEvent);
-
+
// Reserved container (at lower priority) should be run
- Collection<RMContainer> liveContainers = app.getLiveContainers();
- assertEquals(1, liveContainers.size());
- for (RMContainer liveContainer : liveContainers) {
- Assert.assertEquals(2, liveContainer.getContainer().getPriority()
- .getPriority());
- }
- assertEquals(0, scheduler.getRootQueueMetrics().getAvailableMB());
- assertEquals(0, scheduler.getRootQueueMetrics().getAvailableVirtualCores());
+ Collection<RMContainer> liveContainers = app2.getLiveContainers();
+ assertEquals("Allocation post completion failed", 1, liveContainers.size());
+ assertEquals("High prio container allocated against low prio reservation",
+ 2, liveContainers.iterator().next().getContainer().
+ getPriority().getPriority());
}
@Test
@@ -3222,8 +3209,7 @@ public class TestFairScheduler extends FairSchedulerTestBase {
}
/**
- * If we update our ask to strictly request a node, it doesn't make sense to keep
- * a reservation on another.
+ * Strict locality requests shouldn't reserve resources on another node.
*/
@Test
public void testReservationsStrictLocality() throws IOException {
@@ -3231,40 +3217,39 @@ public class TestFairScheduler extends FairSchedulerTestBase {
scheduler.start();
scheduler.reinitialize(conf, resourceManager.getRMContext());
- RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(1024), 1, "127.0.0.1");
- RMNode node2 = MockNodes.newNodeInfo(1, Resources.createResource(1024), 2, "127.0.0.2");
+ // Add two nodes
+ RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(1024, 1));
NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
scheduler.handle(nodeEvent1);
+ RMNode node2 = MockNodes.newNodeInfo(1, Resources.createResource(1024, 1));
+ NodeAddedSchedulerEvent nodeEvent2 = new NodeAddedSchedulerEvent(node2);
+ scheduler.handle(nodeEvent2);
- ApplicationAttemptId attId = createSchedulingRequest(1024, "queue1",
- "user1", 0);
+ // Submit application without container requests
+ ApplicationAttemptId attId =
+ createSchedulingRequest(1024, "queue1", "user1", 0);
FSAppAttempt app = scheduler.getSchedulerApp(attId);
-
- ResourceRequest nodeRequest = createResourceRequest(1024, node2.getHostName(), 1, 2, true);
- ResourceRequest rackRequest = createResourceRequest(1024, "rack1", 1, 2, true);
- ResourceRequest anyRequest = createResourceRequest(1024, ResourceRequest.ANY,
- 1, 2, false);
+
+ // Request a container on node2
+ ResourceRequest nodeRequest =
+ createResourceRequest(1024, node2.getHostName(), 1, 1, true);
+ ResourceRequest rackRequest =
+ createResourceRequest(1024, "rack1", 1, 1, false);
+ ResourceRequest anyRequest =
+ createResourceRequest(1024, ResourceRequest.ANY, 1, 1, false);
createSchedulingRequestExistingApplication(nodeRequest, attId);
createSchedulingRequestExistingApplication(rackRequest, attId);
createSchedulingRequestExistingApplication(anyRequest, attId);
-
scheduler.update();
+ // Heartbeat from node1. App shouldn't get an allocation or reservation
NodeUpdateSchedulerEvent nodeUpdateEvent = new NodeUpdateSchedulerEvent(node1);
scheduler.handle(nodeUpdateEvent);
- assertEquals(1, app.getLiveContainers().size());
- scheduler.handle(nodeUpdateEvent);
- assertEquals(1, app.getReservedContainers().size());
-
- // now, make our request node-specific (on a different node)
- rackRequest = createResourceRequest(1024, "rack1", 1, 1, false);
- anyRequest = createResourceRequest(1024, ResourceRequest.ANY,
- 1, 1, false);
- scheduler.allocate(attId, Arrays.asList(rackRequest, anyRequest),
- new ArrayList<ContainerId>(), null, null, NULL_UPDATE_REQUESTS);
-
+ assertEquals("App assigned a container on the wrong node",
+ 0, app.getLiveContainers().size());
scheduler.handle(nodeUpdateEvent);
- assertEquals(0, app.getReservedContainers().size());
+ assertEquals("App reserved a container on the wrong node",
+ 0, app.getReservedContainers().size());
}
@Test
http://git-wip-us.apache.org/repos/asf/hadoop/blob/718ad9f6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java
index 480a329..322ad5b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java
@@ -72,7 +72,7 @@ public class TestFairSchedulerPreemption extends FairSchedulerTestBase {
{"MinSharePreemptionWithDRF", 1},
{"FairSharePreemption", 2},
{"FairSharePreemptionWithDRF", 3}
- });
+ });
}
public TestFairSchedulerPreemption(String name, int mode)
@@ -110,6 +110,7 @@ public class TestFairSchedulerPreemption extends FairSchedulerTestBase {
* |--- preemptable
* |--- child-1
* |--- child-2
+ * |--- preemptable-sibling
* |--- nonpreemptible
* |--- child-1
* |--- child-2
@@ -133,6 +134,10 @@ public class TestFairSchedulerPreemption extends FairSchedulerTestBase {
out.println("</queue>"); // end of preemptable queue
+ out.println("<queue name=\"preemptable-sibling\">");
+ writePreemptionParams(out);
+ out.println("</queue>");
+
// Queue with preemption disallowed
out.println("<queue name=\"nonpreemptable\">");
out.println("<allowPreemptionFrom>false" +
@@ -269,10 +274,11 @@ public class TestFairSchedulerPreemption extends FairSchedulerTestBase {
preemptHalfResources(queue2);
}
- private void verifyPreemption() throws InterruptedException {
+ private void verifyPreemption(int numStarvedAppContainers)
+ throws InterruptedException {
// Sleep long enough for four containers to be preempted.
for (int i = 0; i < 1000; i++) {
- if (greedyApp.getLiveContainers().size() == 4) {
+ if (greedyApp.getLiveContainers().size() == 2 * numStarvedAppContainers) {
break;
}
Thread.sleep(10);
@@ -280,13 +286,13 @@ public class TestFairSchedulerPreemption extends FairSchedulerTestBase {
// Verify the right amount of containers are preempted from greedyApp
assertEquals("Incorrect number of containers on the greedy app",
- 4, greedyApp.getLiveContainers().size());
+ 2 * numStarvedAppContainers, greedyApp.getLiveContainers().size());
sendEnoughNodeUpdatesToAssignFully();
// Verify the preempted containers are assigned to starvingApp
assertEquals("Starved app is not assigned the right number of containers",
- 2, starvingApp.getLiveContainers().size());
+ numStarvedAppContainers, starvingApp.getLiveContainers().size());
}
private void verifyNoPreemption() throws InterruptedException {
@@ -305,7 +311,7 @@ public class TestFairSchedulerPreemption extends FairSchedulerTestBase {
String queue = "root.preemptable.child-1";
submitApps(queue, queue);
if (fairsharePreemption) {
- verifyPreemption();
+ verifyPreemption(2);
} else {
verifyNoPreemption();
}
@@ -314,13 +320,13 @@ public class TestFairSchedulerPreemption extends FairSchedulerTestBase {
@Test
public void testPreemptionBetweenTwoSiblingLeafQueues() throws Exception {
submitApps("root.preemptable.child-1", "root.preemptable.child-2");
- verifyPreemption();
+ verifyPreemption(2);
}
@Test
public void testPreemptionBetweenNonSiblingQueues() throws Exception {
submitApps("root.preemptable.child-1", "root.nonpreemptable.child-1");
- verifyPreemption();
+ verifyPreemption(2);
}
@Test
@@ -354,7 +360,7 @@ public class TestFairSchedulerPreemption extends FairSchedulerTestBase {
setNumAMContainersPerNode(2);
preemptHalfResources("root.preemptable.child-2");
- verifyPreemption();
+ verifyPreemption(2);
ArrayList<RMContainer> containers =
(ArrayList<RMContainer>) starvingApp.getLiveContainers();
@@ -365,4 +371,24 @@ public class TestFairSchedulerPreemption extends FairSchedulerTestBase {
assertTrue("Preempted containers should come from two different "
+ "nodes.", !host0.equals(host1));
}
+
+ @Test
+ public void testPreemptionBetweenSiblingQueuesWithParentAtFairShare()
+ throws InterruptedException {
+ // Run this test only for fairshare preemption
+ if (!fairsharePreemption) {
+ return;
+ }
+
+ // Let one of the child queues take over the entire cluster
+ takeAllResources("root.preemptable.child-1");
+
+ // Submit a job so half the resources go to parent's sibling
+ preemptHalfResources("root.preemptable-sibling");
+ verifyPreemption(2);
+
+ // Submit a job to the child's sibling to force preemption from the child
+ preemptHalfResources("root.preemptable.child-2");
+ verifyPreemption(1);
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org