You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ka...@apache.org on 2016/01/14 17:33:36 UTC
hadoop git commit: YARN-3446. FairScheduler headroom calculation
should exclude nodes in the blacklist. (Zhihai Xu via kasha)
Repository: hadoop
Updated Branches:
refs/heads/trunk 5cc44d18a -> 9d04f26d4
YARN-3446. FairScheduler headroom calculation should exclude nodes in the blacklist. (Zhihai Xu via kasha)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/9d04f26d
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/9d04f26d
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/9d04f26d
Branch: refs/heads/trunk
Commit: 9d04f26d4c42170ee3dab2f6fb09a94bbf72fc65
Parents: 5cc44d1
Author: Karthik Kambatla <ka...@cloudera.com>
Authored: Thu Jan 14 08:33:23 2016 -0800
Committer: Karthik Kambatla <ka...@cloudera.com>
Committed: Thu Jan 14 08:33:23 2016 -0800
----------------------------------------------------------------------
hadoop-yarn-project/CHANGES.txt | 3 +
.../scheduler/AbstractYarnScheduler.java | 15 ++++
.../scheduler/AppSchedulingInfo.java | 25 ++++--
.../scheduler/fair/FSAppAttempt.java | 25 ++++++
.../scheduler/TestAppSchedulingInfo.java | 73 ++++++++++++++++++
.../scheduler/fair/TestFSAppAttempt.java | 81 ++++++++++++++++++++
6 files changed, 215 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9d04f26d/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index f7c22b3..14bade3 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -124,6 +124,9 @@ Release 2.9.0 - UNRELEASED
YARN-4567. javadoc failing on java 8. (Steve Loughran via aajisaka)
+ YARN-3446. FairScheduler headroom calculation should exclude nodes in the
+ blacklist. (Zhihai Xu via kasha)
+
Release 2.8.0 - UNRELEASED
INCOMPATIBLE CHANGES
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9d04f26d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
index abd72bf..ed93acf 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
@@ -181,6 +181,21 @@ public abstract class AbstractYarnScheduler
return applications;
}
+ /**
+ * Add blacklisted NodeIds to the list that is passed.
+ *
+ * @param app application attempt.
+ * @param blacklistNodeIdList the list to store blacklisted NodeIds.
+ */
+ public void addBlacklistedNodeIdsToList(SchedulerApplicationAttempt app,
+ List<NodeId> blacklistNodeIdList) {
+ for (Map.Entry<NodeId, N> nodeEntry : nodes.entrySet()) {
+ if (SchedulerAppUtils.isBlacklisted(app, nodeEntry.getValue(), LOG)) {
+ blacklistNodeIdList.add(nodeEntry.getKey());
+ }
+ }
+ }
+
@Override
public Resource getClusterResource() {
return clusterResource;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9d04f26d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
index 41d3fd7..d5a5d9d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
@@ -30,6 +30,7 @@ import java.util.TreeMap;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -69,7 +70,7 @@ public class AppSchedulingInfo {
private ActiveUsersManager activeUsersManager;
private boolean pending = true; // whether accepted/allocated by scheduler
private ResourceUsage appResourceUsage;
-
+ private AtomicBoolean userBlacklistChanged = new AtomicBoolean(false);
private final Set<String> amBlacklist = new HashSet<>();
private Set<String> userBlacklist = new HashSet<>();
@@ -424,10 +425,12 @@ public class AppSchedulingInfo {
* @param blacklistAdditions resources to be added to the userBlacklist
* @param blacklistRemovals resources to be removed from the userBlacklist
*/
- public void updateBlacklist(
+ public void updateBlacklist(
List<String> blacklistAdditions, List<String> blacklistRemovals) {
- updateUserOrAMBlacklist(userBlacklist, blacklistAdditions,
- blacklistRemovals);
+ if (updateUserOrAMBlacklist(userBlacklist, blacklistAdditions,
+ blacklistRemovals)) {
+ userBlacklistChanged.set(true);
+ }
}
/**
@@ -441,17 +444,25 @@ public class AppSchedulingInfo {
blacklistRemovals);
}
- void updateUserOrAMBlacklist(Set<String> blacklist,
+ boolean updateUserOrAMBlacklist(Set<String> blacklist,
List<String> blacklistAdditions, List<String> blacklistRemovals) {
+ boolean changed = false;
synchronized (blacklist) {
if (blacklistAdditions != null) {
- blacklist.addAll(blacklistAdditions);
+ changed = blacklist.addAll(blacklistAdditions);
}
if (blacklistRemovals != null) {
- blacklist.removeAll(blacklistRemovals);
+ if (blacklist.removeAll(blacklistRemovals)) {
+ changed = true;
+ }
}
}
+ return changed;
+ }
+
+ public boolean getAndResetBlacklistChanged() {
+ return userBlacklistChanged.getAndSet(false);
}
public synchronized Collection<Priority> getPriorities() {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9d04f26d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java
index 5f753dd..488f34e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
import java.io.Serializable;
import java.text.DecimalFormat;
import java.util.Arrays;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.HashMap;
@@ -85,6 +86,7 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
// Key = RackName, Value = Set of Nodes reserved by app on rack
private Map<String, Set<String>> reservations = new HashMap<>();
+ private List<NodeId> blacklistNodeIds = new ArrayList<NodeId>();
/**
* Delay scheduling: We often want to prioritize scheduling of node-local
* containers over rack-local or off-switch containers. To achieve this
@@ -179,6 +181,27 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
+ this.attemptResourceUsage.getReserved());
}
+ private void subtractResourcesOnBlacklistedNodes(
+ Resource availableResources) {
+ if (appSchedulingInfo.getAndResetBlacklistChanged()) {
+ blacklistNodeIds.clear();
+ scheduler.addBlacklistedNodeIdsToList(this, blacklistNodeIds);
+ }
+ for (NodeId nodeId: blacklistNodeIds) {
+ SchedulerNode node = scheduler.getSchedulerNode(nodeId);
+ if (node != null) {
+ Resources.subtractFrom(availableResources,
+ node.getAvailableResource());
+ }
+ }
+ if (availableResources.getMemory() < 0) {
+ availableResources.setMemory(0);
+ }
+ if (availableResources.getVirtualCores() < 0) {
+ availableResources.setVirtualCores(0);
+ }
+ }
+
/**
* Headroom depends on resources in the cluster, current usage of the
* queue, queue's fair-share and queue's max-resources.
@@ -196,6 +219,8 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
Resource clusterAvailableResources =
Resources.subtract(clusterResource, clusterUsage);
+ subtractResourcesOnBlacklistedNodes(clusterAvailableResources);
+
Resource queueMaxAvailableResources =
Resources.subtract(queue.getMaxShare(), queueUsage);
Resource maxAvailableResource = Resources.componentwiseMin(
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9d04f26d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAppSchedulingInfo.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAppSchedulingInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAppSchedulingInfo.java
new file mode 100644
index 0000000..4141a53
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAppSchedulingInfo.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.doReturn;
+import java.util.ArrayList;
+
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSLeafQueue;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestAppSchedulingInfo {
+
+ @Test
+ public void testBacklistChanged() {
+ ApplicationId appIdImpl = ApplicationId.newInstance(0, 1);
+ ApplicationAttemptId appAttemptId =
+ ApplicationAttemptId.newInstance(appIdImpl, 1);
+
+ FSLeafQueue queue = mock(FSLeafQueue.class);
+ doReturn("test").when(queue).getQueueName();
+ AppSchedulingInfo appSchedulingInfo = new AppSchedulingInfo(
+ appAttemptId, "test", queue, null, 0, new ResourceUsage());
+
+ appSchedulingInfo.updateBlacklist(new ArrayList<String>(),
+ new ArrayList<String>());
+ Assert.assertFalse(appSchedulingInfo.getAndResetBlacklistChanged());
+
+ ArrayList<String> blacklistAdditions = new ArrayList<String>();
+ blacklistAdditions.add("node1");
+ blacklistAdditions.add("node2");
+ appSchedulingInfo.updateBlacklist(blacklistAdditions,
+ new ArrayList<String>());
+ Assert.assertTrue(appSchedulingInfo.getAndResetBlacklistChanged());
+
+ blacklistAdditions.clear();
+ blacklistAdditions.add("node1");
+ appSchedulingInfo.updateBlacklist(blacklistAdditions,
+ new ArrayList<String>());
+ Assert.assertFalse(appSchedulingInfo.getAndResetBlacklistChanged());
+
+ ArrayList<String> blacklistRemovals = new ArrayList<String>();
+ blacklistRemovals.add("node1");
+ appSchedulingInfo.updateBlacklist(new ArrayList<String>(),
+ blacklistRemovals);
+ appSchedulingInfo.updateBlacklist(new ArrayList<String>(),
+ blacklistRemovals);
+ Assert.assertTrue(appSchedulingInfo.getAndResetBlacklistChanged());
+
+ appSchedulingInfo.updateBlacklist(new ArrayList<String>(),
+ blacklistRemovals);
+ Assert.assertFalse(appSchedulingInfo.getAndResetBlacklistChanged());
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9d04f26d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSAppAttempt.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSAppAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSAppAttempt.java
index 7aa62a8..d29d92c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSAppAttempt.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSAppAttempt.java
@@ -18,16 +18,30 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
+import java.util.ArrayList;
+import java.util.List;
+
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.isA;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.spy;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.DominantResourceFairnessPolicy;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FairSharePolicy;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FifoPolicy;
@@ -256,6 +270,73 @@ public class TestFSAppAttempt extends FairSchedulerTestBase {
);
}
+ @Test
+ public void testHeadroomWithBlackListedNodes() {
+ // Add two nodes
+ RMNode node1 =
+ MockNodes.newNodeInfo(1, Resources.createResource(8 * 1024, 8), 1,
+ "127.0.0.1");
+ NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
+ scheduler.handle(nodeEvent1);
+ RMNode node2 =
+ MockNodes.newNodeInfo(1, Resources.createResource(4 * 1024, 4), 2,
+ "127.0.0.2");
+ NodeAddedSchedulerEvent nodeEvent2 = new NodeAddedSchedulerEvent(node2);
+ scheduler.handle(nodeEvent2);
+ assertEquals("We should have two alive nodes.",
+ 2, scheduler.getNumClusterNodes());
+ Resource clusterResource = scheduler.getClusterResource();
+ Resource clusterUsage = scheduler.getRootQueueMetrics()
+ .getAllocatedResources();
+ assertEquals(12 * 1024, clusterResource.getMemory());
+ assertEquals(12, clusterResource.getVirtualCores());
+ assertEquals(0, clusterUsage.getMemory());
+ assertEquals(0, clusterUsage.getVirtualCores());
+ ApplicationAttemptId id11 = createAppAttemptId(1, 1);
+ createMockRMApp(id11);
+ scheduler.addApplication(id11.getApplicationId(),
+ "default", "user1", false);
+ scheduler.addApplicationAttempt(id11, false, false);
+ assertNotNull(scheduler.getSchedulerApplications().get(id11.
+ getApplicationId()));
+ FSAppAttempt app = scheduler.getSchedulerApp(id11);
+ assertNotNull(app);
+ Resource queueUsage = app.getQueue().getResourceUsage();
+ assertEquals(0, queueUsage.getMemory());
+ assertEquals(0, queueUsage.getVirtualCores());
+ SchedulerNode n1 = scheduler.getSchedulerNode(node1.getNodeID());
+ SchedulerNode n2 = scheduler.getSchedulerNode(node2.getNodeID());
+ assertNotNull(n1);
+ assertNotNull(n2);
+ List<String> blacklistAdditions = new ArrayList<String>(1);
+ List<String> blacklistRemovals = new ArrayList<String>(1);
+ blacklistAdditions.add(n1.getNodeName());
+ app.updateBlacklist(blacklistAdditions, blacklistRemovals);
+ app.getQueue().setFairShare(clusterResource);
+ FSAppAttempt spyApp = spy(app);
+ doReturn(false)
+ .when(spyApp).isWaitingForAMContainer();
+ assertTrue(spyApp.isBlacklisted(n1.getNodeName()));
+ assertFalse(spyApp.isBlacklisted(n2.getNodeName()));
+ assertEquals(n2.getAvailableResource(), spyApp.getHeadroom());
+
+ blacklistAdditions.clear();
+ blacklistAdditions.add(n2.getNodeName());
+ blacklistRemovals.add(n1.getNodeName());
+ app.updateBlacklist(blacklistAdditions, blacklistRemovals);
+ assertFalse(spyApp.isBlacklisted(n1.getNodeName()));
+ assertTrue(spyApp.isBlacklisted(n2.getNodeName()));
+ assertEquals(n1.getAvailableResource(), spyApp.getHeadroom());
+
+ blacklistAdditions.clear();
+ blacklistRemovals.clear();
+ blacklistRemovals.add(n2.getNodeName());
+ app.updateBlacklist(blacklistAdditions, blacklistRemovals);
+ assertFalse(spyApp.isBlacklisted(n1.getNodeName()));
+ assertFalse(spyApp.isBlacklisted(n2.getNodeName()));
+ assertEquals(clusterResource, spyApp.getHeadroom());
+ }
+
private static int min(int value1, int value2, int value3) {
return Math.min(Math.min(value1, value2), value3);
}