You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ka...@apache.org on 2016/06/16 02:07:11 UTC

[2/5] hadoop git commit: Y-5181. v1

Y-5181. v1


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/d781c25f
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/d781c25f
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/d781c25f

Branch: refs/heads/fs-preemption
Commit: d781c25f46a217d945177f98a0efed22d6513bc7
Parents: ec5b5ec
Author: Karthik Kambatla <ka...@apache.org>
Authored: Mon May 30 23:24:37 2016 -0700
Committer: Karthik Kambatla <ka...@apache.org>
Committed: Mon May 30 23:29:00 2016 -0700

----------------------------------------------------------------------
 .../scheduler/ClusterNodeTracker.java           | 55 ++++++++++++----
 .../scheduler/TestClusterNodeTracker.java       | 68 ++++++++++++++++++++
 2 files changed, 111 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/d781c25f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ClusterNodeTracker.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ClusterNodeTracker.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ClusterNodeTracker.java
index feb071f..9ff83fa 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ClusterNodeTracker.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ClusterNodeTracker.java
@@ -18,11 +18,14 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
 
+import com.google.common.base.Preconditions;
+import org.apache.commons.collections.map.HashedMap;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
 import org.apache.hadoop.yarn.util.resource.Resources;
 
@@ -50,7 +53,8 @@ public class ClusterNodeTracker<N extends SchedulerNode> {
   private Lock writeLock = readWriteLock.writeLock();
 
   private HashMap<NodeId, N> nodes = new HashMap<>();
-  private Map<String, Integer> nodesPerRack = new HashMap<>();
+  private Map<String, N> nodeNameToNodeMap = new HashMap<>();
+  private Map<String, List<N>> nodesPerRack = new HashMap<>();
 
   private Resource clusterCapacity = Resources.clone(Resources.none());
   private Resource staleClusterCapacity = null;
@@ -66,14 +70,16 @@ public class ClusterNodeTracker<N extends SchedulerNode> {
     writeLock.lock();
     try {
       nodes.put(node.getNodeID(), node);
+      nodeNameToNodeMap.put(node.getNodeName(), node);
 
       // Update nodes per rack as well
       String rackName = node.getRackName();
-      Integer numNodes = nodesPerRack.get(rackName);
-      if (numNodes == null) {
-        numNodes = 0;
+      List<N> nodesList = nodesPerRack.get(rackName);
+      if (nodesList == null) {
+        nodesList = new ArrayList<>();
+        nodesPerRack.put(rackName, nodesList);
       }
-      nodesPerRack.put(rackName, ++numNodes);
+      nodesList.add(node);
 
       // Update cluster capacity
       Resources.addTo(clusterCapacity, node.getTotalResource());
@@ -126,8 +132,8 @@ public class ClusterNodeTracker<N extends SchedulerNode> {
     readLock.lock();
     String rName = rackName == null ? "NULL" : rackName;
     try {
-      Integer nodeCount = nodesPerRack.get(rName);
-      return nodeCount == null ? 0 : nodeCount;
+      List<N> nodesList = nodesPerRack.get(rName);
+      return nodesList == null ? 0 : nodesList.size();
     } finally {
       readLock.unlock();
     }
@@ -154,14 +160,18 @@ public class ClusterNodeTracker<N extends SchedulerNode> {
         LOG.warn("Attempting to remove a non-existent node " + nodeId);
         return null;
       }
+      nodeNameToNodeMap.remove(node.getNodeName());
 
       // Update nodes per rack as well
       String rackName = node.getRackName();
-      Integer numNodes = nodesPerRack.get(rackName);
-      if (numNodes > 0) {
-        nodesPerRack.put(rackName, --numNodes);
-      } else {
+      List<N> nodesList = nodesPerRack.get(rackName);
+      if (nodesList == null) {
         LOG.error("Attempting to remove node from an empty rack " + rackName);
+      } else {
+        nodesList.remove(node);
+        if (nodesList.isEmpty()) {
+          nodesPerRack.remove(rackName);
+        }
       }
 
       // Update cluster capacity
@@ -254,7 +264,7 @@ public class ClusterNodeTracker<N extends SchedulerNode> {
   }
 
   public List<N> getAllNodes() {
-    return getNodes(null);
+    return getNodes((NodeFilter)null);
   }
 
   /**
@@ -297,4 +307,25 @@ public class ClusterNodeTracker<N extends SchedulerNode> {
     Collections.sort(sortedList, comparator);
     return sortedList;
   }
+
+  /**
+   * Convenience method to return list of nodes corresponding to resourceName
+   * passed in the {@link ResourceRequest}.
+   */
+  public List<N> getNodes(final String resourceName) {
+    Preconditions.checkArgument(
+        resourceName != null && !resourceName.isEmpty());
+    List<N> nodes = new ArrayList<>();
+    if (ResourceRequest.ANY.equals(resourceName)) {
+      return getAllNodes();
+    } else if (nodeNameToNodeMap.containsKey(resourceName)) {
+      nodes.add(nodeNameToNodeMap.get(resourceName));
+    } else if (nodesPerRack.containsKey(resourceName)) {
+      return nodesPerRack.get(resourceName);
+    } else {
+      LOG.info(
+          "Could not find a node matching given resourceName " + resourceName);
+    }
+    return nodes;
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d781c25f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestClusterNodeTracker.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestClusterNodeTracker.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestClusterNodeTracker.java
new file mode 100644
index 0000000..06e7dc8
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestClusterNodeTracker.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
+
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSSchedulerNode;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Test class to verify ClusterNodeTracker. Using FSSchedulerNode without
+ * loss of generality.
+ */
+public class TestClusterNodeTracker {
+  ClusterNodeTracker<FSSchedulerNode> nodeTracker = new ClusterNodeTracker();
+
+  @Before
+  public void setup() {
+    List<RMNode> rmNodes =
+        MockNodes.newNodes(2, 4, Resource.newInstance(4096, 4));
+    for (RMNode rmNode : rmNodes) {
+      nodeTracker.addNode(new FSSchedulerNode(rmNode, false));
+    }
+  }
+
+  @Test
+  public void testGetNodeCount() {
+    assertEquals("Incorrect number of nodes in the cluster",
+        8, nodeTracker.nodeCount());
+
+    assertEquals("Incorrect number of nodes in each rack",
+        4, nodeTracker.nodeCount("rack0"));
+  }
+
+  @Test
+  public void testGetNodesForResourceName() throws Exception {
+    assertEquals("Incorrect number of nodes matching ANY",
+        8, nodeTracker.getNodes(ResourceRequest.ANY).size());
+
+    assertEquals("Incorrect number of nodes matching rack",
+        4, nodeTracker.getNodes("rack0").size());
+
+    assertEquals("Incorrect number of nodes matching node",
+        1, nodeTracker.getNodes("host0").size());
+  }
+}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org