You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ji...@apache.org on 2015/01/10 02:50:25 UTC

hadoop git commit: YARN-3014. Replaces labels on a host should update all NM's labels on that host. Contributed by Wangda Tan (cherry picked from commit a2604062681230442eefae79815db5d77ccc4c2e)

Repository: hadoop
Updated Branches:
  refs/heads/branch-2 68709cb9b -> 36b3dcaab


YARN-3014. Replaces labels on a host should update all NM's labels on that host. Contributed by Wangda Tan
(cherry picked from commit a2604062681230442eefae79815db5d77ccc4c2e)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/36b3dcaa
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/36b3dcaa
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/36b3dcaa

Branch: refs/heads/branch-2
Commit: 36b3dcaab2f6dc62dc4761c2a04fe9385e2141ac
Parents: 68709cb
Author: Jian He <ji...@apache.org>
Authored: Fri Jan 9 17:49:53 2015 -0800
Committer: Jian He <ji...@apache.org>
Committed: Fri Jan 9 17:50:18 2015 -0800

----------------------------------------------------------------------
 hadoop-yarn-project/CHANGES.txt                 |   3 +
 .../nodelabels/CommonNodeLabelsManager.java     | 163 ++++++++-----------
 .../nodelabels/TestCommonNodeLabelsManager.java |  38 +++++
 .../nodelabels/RMNodeLabelsManager.java         |   1 -
 4 files changed, 113 insertions(+), 92 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/36b3dcaa/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index e2f6250..839a472 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -310,6 +310,9 @@ Release 2.7.0 - UNRELEASED
     YARN-2997. Fixed NodeStatusUpdater to not send alreay-sent completed
     container statuses on heartbeat. (Chengbing Liu via jianhe)
 
+    YARN-3014. Replaces labels on a host should update all NM's labels on that
+    host. (Wangda Tan via jianhe)
+
 Release 2.6.0 - 2014-11-18
 
   INCOMPATIBLE CHANGES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/36b3dcaa/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/CommonNodeLabelsManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/CommonNodeLabelsManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/CommonNodeLabelsManager.java
index e888cc5..aeefff1 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/CommonNodeLabelsManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/CommonNodeLabelsManager.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.yarn.nodelabels;
 import java.io.IOException;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
@@ -130,6 +131,12 @@ public class CommonNodeLabelsManager extends AbstractService {
       return c;
     }
   }
+  
+  private enum NodeLabelUpdateOperation {
+    ADD,
+    REMOVE,
+    REPLACE
+  }
 
   private final class ForwardingEventHandler implements
       EventHandler<NodeLabelsStoreEvent> {
@@ -290,45 +297,6 @@ public class CommonNodeLabelsManager extends AbstractService {
     }
   }
   
-  @SuppressWarnings("unchecked")
-  protected void internalAddLabelsToNode(
-      Map<NodeId, Set<String>> addedLabelsToNode) throws IOException {
-    // do add labels to nodes
-    Map<NodeId, Set<String>> newNMToLabels =
-        new HashMap<NodeId, Set<String>>();
-    for (Entry<NodeId, Set<String>> entry : addedLabelsToNode.entrySet()) {
-      NodeId nodeId = entry.getKey();
-      Set<String> labels = entry.getValue();
- 
-      createHostIfNonExisted(nodeId.getHost());
-      if (nodeId.getPort() == WILDCARD_PORT) {
-        Host host = nodeCollections.get(nodeId.getHost());
-        host.labels.addAll(labels);
-        newNMToLabels.put(nodeId, host.labels);
-      } else {
-        createNodeIfNonExisted(nodeId);
-        Node nm = getNMInNodeSet(nodeId);
-        if (nm.labels == null) {
-          nm.labels = new HashSet<String>();
-        }
-        nm.labels.addAll(labels);
-        newNMToLabels.put(nodeId, nm.labels);
-      }
-    }
-
-    if (null != dispatcher) {
-      dispatcher.getEventHandler().handle(
-          new UpdateNodeToLabelsMappingsEvent(newNMToLabels));
-    }
-
-    // shows node->labels we added
-    LOG.info("addLabelsToNode:");
-    for (Entry<NodeId, Set<String>> entry : newNMToLabels.entrySet()) {
-      LOG.info("  NM=" + entry.getKey() + ", labels=["
-          + StringUtils.join(entry.getValue().iterator(), ",") + "]");
-    }
-  }
-  
   /**
    * add more labels to nodes
    * 
@@ -338,7 +306,7 @@ public class CommonNodeLabelsManager extends AbstractService {
       throws IOException {
     addedLabelsToNode = normalizeNodeIdToLabels(addedLabelsToNode);
     checkAddLabelsToNode(addedLabelsToNode);
-    internalAddLabelsToNode(addedLabelsToNode);
+    internalUpdateLabelsOnNodes(addedLabelsToNode, NodeLabelUpdateOperation.ADD);
   }
   
   protected void checkRemoveFromClusterNodeLabels(
@@ -469,24 +437,75 @@ public class CommonNodeLabelsManager extends AbstractService {
   }
   
   @SuppressWarnings("unchecked")
-  protected void internalRemoveLabelsFromNode(
-      Map<NodeId, Set<String>> removeLabelsFromNode) {
-    // do remove labels from nodes
+  protected void internalUpdateLabelsOnNodes(
+      Map<NodeId, Set<String>> nodeToLabels, NodeLabelUpdateOperation op)
+      throws IOException {
+    // do update labels from nodes
     Map<NodeId, Set<String>> newNMToLabels =
         new HashMap<NodeId, Set<String>>();
-    for (Entry<NodeId, Set<String>> entry : removeLabelsFromNode.entrySet()) {
+    for (Entry<NodeId, Set<String>> entry : nodeToLabels.entrySet()) {
       NodeId nodeId = entry.getKey();
       Set<String> labels = entry.getValue();
       
+      createHostIfNonExisted(nodeId.getHost());
       if (nodeId.getPort() == WILDCARD_PORT) {
         Host host = nodeCollections.get(nodeId.getHost());
-        host.labels.removeAll(labels);
+        switch (op) {
+        case REMOVE: 
+          host.labels.removeAll(labels);
+          for (Node node : host.nms.values()) {
+            if (node.labels != null) {
+              node.labels.removeAll(labels);
+            }
+          }
+          break;
+        case ADD:
+          host.labels.addAll(labels);
+          for (Node node : host.nms.values()) {
+            if (node.labels != null) {
+              node.labels.addAll(labels);
+            }
+          }
+          break;
+        case REPLACE:
+          host.labels.clear();
+          host.labels.addAll(labels);
+          for (Node node : host.nms.values()) {
+            node.labels = null;
+          }
+          break;
+        default:
+          break;
+        }
         newNMToLabels.put(nodeId, host.labels);
       } else {
-        Node nm = getNMInNodeSet(nodeId);
-        if (nm.labels != null) {
-          nm.labels.removeAll(labels);
+        if (EnumSet.of(NodeLabelUpdateOperation.ADD,
+            NodeLabelUpdateOperation.REPLACE).contains(op)) {
+          // Add and replace
+          createNodeIfNonExisted(nodeId);
+          Node nm = getNMInNodeSet(nodeId);
+          if (nm.labels == null) {
+            nm.labels = new HashSet<String>();
+          }
+          switch (op) {
+          case ADD:
+            nm.labels.addAll(labels);
+            break;
+          case REPLACE:
+            nm.labels.clear();
+            nm.labels.addAll(labels);
+            break;
+          default:
+            break;
+          }
           newNMToLabels.put(nodeId, nm.labels);
+        } else {
+          // remove
+          Node nm = getNMInNodeSet(nodeId);
+          if (nm.labels != null) {
+            nm.labels.removeAll(labels);
+            newNMToLabels.put(nodeId, nm.labels);
+          }
         }
       }
     }
@@ -497,7 +516,7 @@ public class CommonNodeLabelsManager extends AbstractService {
     }
 
     // shows node->labels we added
-    LOG.info("removeLabelsFromNode:");
+    LOG.info(op.name() + " labels on nodes:");
     for (Entry<NodeId, Set<String>> entry : newNMToLabels.entrySet()) {
       LOG.info("  NM=" + entry.getKey() + ", labels=["
           + StringUtils.join(entry.getValue().iterator(), ",") + "]");
@@ -517,7 +536,8 @@ public class CommonNodeLabelsManager extends AbstractService {
     
     checkRemoveLabelsFromNode(removeLabelsFromNode);
 
-    internalRemoveLabelsFromNode(removeLabelsFromNode);
+    internalUpdateLabelsOnNodes(removeLabelsFromNode,
+        NodeLabelUpdateOperation.REMOVE);
   }
   
   protected void checkReplaceLabelsOnNode(
@@ -539,47 +559,7 @@ public class CommonNodeLabelsManager extends AbstractService {
       }
     }
   }
-  
-  @SuppressWarnings("unchecked")
-  protected void internalReplaceLabelsOnNode(
-      Map<NodeId, Set<String>> replaceLabelsToNode) throws IOException {
-    // do replace labels to nodes
-    Map<NodeId, Set<String>> newNMToLabels = new HashMap<NodeId, Set<String>>();
-    for (Entry<NodeId, Set<String>> entry : replaceLabelsToNode.entrySet()) {
-      NodeId nodeId = entry.getKey();
-      Set<String> labels = entry.getValue();
 
-      createHostIfNonExisted(nodeId.getHost());      
-      if (nodeId.getPort() == WILDCARD_PORT) {
-        Host host = nodeCollections.get(nodeId.getHost());
-        host.labels.clear();
-        host.labels.addAll(labels);
-        newNMToLabels.put(nodeId, host.labels);
-      } else {
-        createNodeIfNonExisted(nodeId);
-        Node nm = getNMInNodeSet(nodeId);
-        if (nm.labels == null) {
-          nm.labels = new HashSet<String>();
-        }
-        nm.labels.clear();
-        nm.labels.addAll(labels);
-        newNMToLabels.put(nodeId, nm.labels);
-      }
-    }
-
-    if (null != dispatcher) {
-      dispatcher.getEventHandler().handle(
-          new UpdateNodeToLabelsMappingsEvent(newNMToLabels));
-    }
-
-    // shows node->labels we added
-    LOG.info("setLabelsToNode:");
-    for (Entry<NodeId, Set<String>> entry : newNMToLabels.entrySet()) {
-      LOG.info("  NM=" + entry.getKey() + ", labels=["
-          + StringUtils.join(entry.getValue().iterator(), ",") + "]");
-    }
-  }
-  
   /**
    * replace labels to nodes
    * 
@@ -591,7 +571,8 @@ public class CommonNodeLabelsManager extends AbstractService {
     
     checkReplaceLabelsOnNode(replaceLabelsToNode);
 
-    internalReplaceLabelsOnNode(replaceLabelsToNode);
+    internalUpdateLabelsOnNodes(replaceLabelsToNode,
+        NodeLabelUpdateOperation.REPLACE);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/36b3dcaa/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/nodelabels/TestCommonNodeLabelsManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/nodelabels/TestCommonNodeLabelsManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/nodelabels/TestCommonNodeLabelsManager.java
index a56a595..c0b05e3 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/nodelabels/TestCommonNodeLabelsManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/nodelabels/TestCommonNodeLabelsManager.java
@@ -281,4 +281,42 @@ public class TestCommonNodeLabelsManager extends NodeLabelTestBase {
     mgr.removeLabelsFromNode(ImmutableMap.of(toNodeId("n1"), toSet("  p2 ")));
     Assert.assertTrue(mgr.getNodeLabels().isEmpty());
   }
+  
+  @Test(timeout = 5000)
+  public void testReplaceLabelsOnHostsShouldUpdateNodesBelongTo()
+      throws IOException {
+    mgr.addToCluserNodeLabels(toSet("p1", "p2", "p3"));
+    mgr.addLabelsToNode(ImmutableMap.of(toNodeId("n1"), toSet("p1", "p2")));
+    assertMapEquals(
+        mgr.getNodeLabels(),
+        ImmutableMap.of(toNodeId("n1"), toSet("p1", "p2")));
+    
+    // Replace labels on n1:1 to P2
+    mgr.replaceLabelsOnNode(ImmutableMap.of(toNodeId("n1:1"), toSet("p2"),
+        toNodeId("n1:2"), toSet("p2")));
+    assertMapEquals(mgr.getNodeLabels(), ImmutableMap.of(toNodeId("n1"),
+        toSet("p1", "p2"), toNodeId("n1:1"), toSet("p2"), toNodeId("n1:2"),
+        toSet("p2")));
+    
+    // Replace labels on n1 to P1, both n1:1/n1 will be P1 now
+    mgr.replaceLabelsOnNode(ImmutableMap.of(toNodeId("n1"), toSet("p1")));
+    assertMapEquals(mgr.getNodeLabels(), ImmutableMap.of(toNodeId("n1"),
+        toSet("p1"), toNodeId("n1:1"), toSet("p1"), toNodeId("n1:2"),
+        toSet("p1")));
+    
+    // Set labels on n1:1 to P2 again to verify if add/remove works
+    mgr.replaceLabelsOnNode(ImmutableMap.of(toNodeId("n1:1"), toSet("p2")));
+    
+    // Add p3 to n1, should makes n1:1 to be p2/p3, and n1:2 to be p1/p3
+    mgr.addLabelsToNode(ImmutableMap.of(toNodeId("n1"), toSet("p3")));
+    assertMapEquals(mgr.getNodeLabels(), ImmutableMap.of(toNodeId("n1"),
+        toSet("p1", "p3"), toNodeId("n1:1"), toSet("p2", "p3"),
+        toNodeId("n1:2"), toSet("p1", "p3")));
+    
+    // Remove P3 from n1, should makes n1:1 to be p2, and n1:2 to be p1
+    mgr.removeLabelsFromNode(ImmutableMap.of(toNodeId("n1"), toSet("p3")));
+    assertMapEquals(mgr.getNodeLabels(), ImmutableMap.of(toNodeId("n1"),
+        toSet("p1"), toNodeId("n1:1"), toSet("p2"), toNodeId("n1:2"),
+        toSet("p1")));
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/36b3dcaa/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/RMNodeLabelsManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/RMNodeLabelsManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/RMNodeLabelsManager.java
index 828d1bc..18478e3 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/RMNodeLabelsManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/RMNodeLabelsManager.java
@@ -37,7 +37,6 @@ import org.apache.hadoop.security.authorize.AccessControlList;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.event.Dispatcher;
 import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
 import org.apache.hadoop.yarn.nodelabels.NodeLabel;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;