You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by su...@apache.org on 2018/07/27 11:33:30 UTC

[35/50] [abbrv] hadoop git commit: YARN-7965. NodeAttributeManager add/get API is not working properly. Contributed by Weiwei Yang.

YARN-7965. NodeAttributeManager add/get API is not working properly. Contributed by Weiwei Yang.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/e6b84252
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/e6b84252
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/e6b84252

Branch: refs/heads/YARN-3409
Commit: e6b84252588c9f4da73f6dead23dc60991596c05
Parents: d3a8187
Author: Naganarasimha <na...@apache.org>
Authored: Tue Feb 27 18:46:16 2018 +0800
Committer: Sunil G <su...@apache.org>
Committed: Fri Jul 27 16:40:34 2018 +0530

----------------------------------------------------------------------
 .../yarn/nodelabels/NodeAttributesManager.java  |   8 +-
 .../nodelabels/NodeAttributesManagerImpl.java   |  38 ++-
 .../nodelabels/TestNodeAttributesManager.java   | 258 +++++++++++++++++++
 3 files changed, 288 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/e6b84252/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/NodeAttributesManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/NodeAttributesManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/NodeAttributesManager.java
index 63f3dcf..effda9b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/NodeAttributesManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/NodeAttributesManager.java
@@ -69,10 +69,14 @@ public abstract class NodeAttributesManager extends AbstractService {
       Map<String, Set<NodeAttribute>> nodeAttributeMapping) throws IOException;
 
   /**
+   * Returns a set of node attributes whose prefix is one of the given
+   * prefixes; if the prefix set is null or empty, all attributes are returned;
+   * if prefix set is given but no mapping could be found, an empty set
+   * is returned.
+   *
    * @param prefix set of prefix string's for which the attributes needs to
    *          returned
-   * @return set of node Attributes objects for the specified set of prefixes,
-   *         else return all
+   * @return set of node Attributes
    */
   public abstract Set<NodeAttribute> getClusterNodeAttributes(
       Set<String> prefix);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e6b84252/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/NodeAttributesManagerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/NodeAttributesManagerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/NodeAttributesManagerImpl.java
index 2e63a7c..a902ac6 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/NodeAttributesManagerImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/NodeAttributesManagerImpl.java
@@ -27,6 +27,7 @@ import java.util.Map.Entry;
 import java.util.Set;
 import java.util.TreeMap;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentHashMap.KeySetView;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
@@ -141,6 +142,7 @@ public class NodeAttributesManagerImpl extends NodeAttributesManager {
         Host node = nodeCollections.get(nodeHost);
         if (node == null) {
           node = new Host(nodeHost);
+          nodeCollections.put(nodeHost, node);
         }
         switch (op) {
         case REMOVE:
@@ -181,8 +183,16 @@ public class NodeAttributesManagerImpl extends NodeAttributesManager {
 
   private void removeNodeFromAttributes(String nodeHost,
       Set<NodeAttribute> attributeMappings) {
-    for (NodeAttribute attribute : attributeMappings) {
-      clusterAttributes.get(attribute).removeNode(nodeHost);
+    for (NodeAttribute rmAttribute : attributeMappings) {
+      RMNodeAttribute host = clusterAttributes.get(rmAttribute);
+      if (host != null) {
+        host.removeNode(nodeHost);
+        // If there is no other host has such attribute,
+        // remove it from the global mapping.
+        if (host.getAssociatedNodeIds().isEmpty()) {
+          clusterAttributes.remove(rmAttribute);
+        }
+      }
     }
   }
 
@@ -305,19 +315,19 @@ public class NodeAttributesManagerImpl extends NodeAttributesManager {
   @Override
   public Set<NodeAttribute> getClusterNodeAttributes(Set<String> prefix) {
     Set<NodeAttribute> attributes = new HashSet<>();
-    try {
-      readLock.lock();
-      attributes.addAll(clusterAttributes.keySet());
-    } finally {
-      readLock.unlock();
+    KeySetView<NodeAttribute, RMNodeAttribute> allAttributes =
+        clusterAttributes.keySet();
+    // Return all if prefix is not given.
+    if (prefix == null || prefix.isEmpty()) {
+      attributes.addAll(allAttributes);
+      return attributes;
     }
-    if (prefix != null && prefix.isEmpty()) {
-      Iterator<NodeAttribute> iterator = attributes.iterator();
-      while (iterator.hasNext()) {
-        NodeAttribute attribute = iterator.next();
-        if (!prefix.contains(attribute.getAttributePrefix())) {
-          iterator.remove();
-        }
+    // Try search attributes by prefix and return valid ones.
+    Iterator<NodeAttribute> iterator = allAttributes.iterator();
+    while (iterator.hasNext()) {
+      NodeAttribute current = iterator.next();
+      if (prefix.contains(current.getAttributePrefix())) {
+        attributes.add(current);
       }
     }
     return attributes;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e6b84252/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/TestNodeAttributesManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/TestNodeAttributesManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/TestNodeAttributesManager.java
new file mode 100644
index 0000000..b639a74
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/TestNodeAttributesManager.java
@@ -0,0 +1,258 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.nodelabels;
+
+import com.google.common.collect.Sets;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.NodeAttribute;
+import org.apache.hadoop.yarn.api.records.NodeAttributeType;
+import org.apache.hadoop.yarn.nodelabels.AttributeValue;
+import org.apache.hadoop.yarn.nodelabels.NodeAttributesManager;
+import org.junit.Test;
+import org.junit.Before;
+import org.junit.After;
+import org.junit.Assert;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Unit tests for node attribute manager.
+ */
+public class TestNodeAttributesManager {
+
+  private NodeAttributesManager attributesManager;
+  private final static String[] PREFIXES =
+      new String[] {"yarn.test1.io", "yarn.test2.io", "yarn.test3.io"};
+  private final static String[] HOSTNAMES =
+      new String[] {"host1", "host2", "host3"};
+
+  @Before
+  public void init() {
+    Configuration conf = new Configuration();
+    attributesManager = new NodeAttributesManagerImpl();
+    attributesManager.init(conf);
+    attributesManager.start();
+  }
+
+  @After
+  public void cleanUp() {
+    if (attributesManager != null) {
+      attributesManager.stop();
+    }
+  }
+
+  private Set<NodeAttribute> createAttributesForTest(String attributePrefix,
+      int numOfAttributes, String attributeNamePrefix,
+      String attributeValuePrefix) {
+    Set<NodeAttribute> attributes = new HashSet<>();
+    for (int i = 0; i< numOfAttributes; i++) {
+      NodeAttribute attribute = NodeAttribute.newInstance(
+          attributePrefix, attributeNamePrefix + "_" + i,
+          NodeAttributeType.STRING, attributeValuePrefix + "_" + i);
+      attributes.add(attribute);
+    }
+    return attributes;
+  }
+
+  private boolean sameAttributeSet(Set<NodeAttribute> set1,
+      Set<NodeAttribute> set2) {
+    return Sets.difference(set1, set2).isEmpty();
+  }
+
+  @Test
+  public void testAddNodeAttributes() throws IOException {
+    Map<String, Set<NodeAttribute>> toAddAttributes = new HashMap<>();
+    Map<NodeAttribute, AttributeValue> nodeAttributes;
+
+    // Add 3 attributes to host1
+    //  yarn.test1.io/A1=host1_v1_1
+    //  yarn.test1.io/A2=host1_v1_2
+    //  yarn.test1.io/A3=host1_v1_3
+    toAddAttributes.put(HOSTNAMES[0],
+        createAttributesForTest(PREFIXES[0], 3, "A", "host1_v1"));
+
+    attributesManager.addNodeAttributes(toAddAttributes);
+    nodeAttributes = attributesManager.getAttributesForNode(HOSTNAMES[0]);
+
+    Assert.assertEquals(3, nodeAttributes.size());
+    Assert.assertTrue(sameAttributeSet(toAddAttributes.get(HOSTNAMES[0]),
+        nodeAttributes.keySet()));
+
+    // Add 2 attributes to host2
+    //  yarn.test1.io/A1=host2_v1_1
+    //  yarn.test1.io/A2=host2_v1_2
+    toAddAttributes.clear();
+    toAddAttributes.put(HOSTNAMES[1],
+        createAttributesForTest(PREFIXES[0], 2, "A", "host2_v1"));
+    attributesManager.addNodeAttributes(toAddAttributes);
+
+    // Verify host1 attributes are still valid.
+    nodeAttributes = attributesManager.getAttributesForNode(HOSTNAMES[0]);
+    Assert.assertEquals(3, nodeAttributes.size());
+
+    // Verify new added host2 attributes are correctly updated.
+    nodeAttributes = attributesManager.getAttributesForNode(HOSTNAMES[1]);
+    Assert.assertEquals(2, nodeAttributes.size());
+    Assert.assertTrue(sameAttributeSet(toAddAttributes.get(HOSTNAMES[1]),
+        nodeAttributes.keySet()));
+
+    // Cluster wide, it only has 3 attributes.
+    //  yarn.test1.io/A1
+    //  yarn.test1.io/A2
+    //  yarn.test1.io/A3
+    Set<NodeAttribute> clusterAttributes = attributesManager
+        .getClusterNodeAttributes(Sets.newHashSet(PREFIXES[0]));
+    Assert.assertEquals(3, clusterAttributes.size());
+
+    // Query for attributes under a non-exist prefix,
+    // ensure it returns an empty set.
+    clusterAttributes = attributesManager
+        .getClusterNodeAttributes(Sets.newHashSet("non_exist_prefix"));
+    Assert.assertEquals(0, clusterAttributes.size());
+
+    // Not provide any prefix, ensure it returns all attributes.
+    clusterAttributes = attributesManager.getClusterNodeAttributes(null);
+    Assert.assertEquals(3, clusterAttributes.size());
+
+    // Add some other attributes with different prefixes on host1 and host2.
+    toAddAttributes.clear();
+
+    // Host1
+    //  yarn.test2.io/A_1=host1_v2_1
+    //  ...
+    //  yarn.test2.io/A_10=host1_v2_10
+    toAddAttributes.put(HOSTNAMES[0],
+        createAttributesForTest(PREFIXES[1], 10, "C", "host1_v2"));
+    // Host2
+    //  yarn.test2.io/C_1=host1_v2_1
+    //  ...
+    //  yarn.test2.io/C_20=host1_v2_20
+    toAddAttributes.put(HOSTNAMES[1],
+        createAttributesForTest(PREFIXES[1], 20, "C", "host1_v2"));
+    attributesManager.addNodeAttributes(toAddAttributes);
+
+    nodeAttributes = attributesManager.getAttributesForNode(HOSTNAMES[0]);
+    Assert.assertEquals(13, nodeAttributes.size());
+
+    nodeAttributes = attributesManager.getAttributesForNode(HOSTNAMES[1]);
+    Assert.assertEquals(22, nodeAttributes.size());
+  }
+
+  @Test
+  public void testRemoveNodeAttributes() throws IOException {
+    Map<String, Set<NodeAttribute>> toAddAttributes = new HashMap<>();
+    Map<String, Set<NodeAttribute>> toRemoveAttributes = new HashMap<>();
+    Set<NodeAttribute> allAttributesPerPrefix = new HashSet<>();
+    Map<NodeAttribute, AttributeValue> nodeAttributes;
+
+    // Host1 -----------------------
+    //  yarn.test1.io
+    //    A1=host1_v1_1
+    //    A2=host1_v1_2
+    //    A3=host1_v1_3
+    //  yarn.test2.io
+    //    B1=host1_v2_1
+    //    ...
+    //    B5=host5_v2_5
+    // Host2 -----------------------
+    //  yarn.test1.io
+    //    A1=host2_v1_1
+    //    A2=host2_v1_2
+    //  yarn.test3.io
+    //    C1=host2_v3_1
+    //    c2=host2_v3_2
+    Set<NodeAttribute> host1set = new HashSet<>();
+    Set<NodeAttribute> host1set1 =
+        createAttributesForTest(PREFIXES[0], 3, "A", "host1_v1");
+    Set<NodeAttribute> host1set2 =
+        createAttributesForTest(PREFIXES[1], 5, "B", "host1_v1");
+    host1set.addAll(host1set1);
+    host1set.addAll(host1set2);
+
+    Set<NodeAttribute> host2set = new HashSet<>();
+    Set<NodeAttribute> host2set1 =
+        createAttributesForTest(PREFIXES[0], 2, "A", "host2_v1");
+    Set<NodeAttribute> host2set2 =
+        createAttributesForTest(PREFIXES[2], 2, "C", "host2_v3");
+    host2set.addAll(host2set1);
+    host2set.addAll(host2set2);
+
+    toAddAttributes.put(HOSTNAMES[0], host1set);
+    toAddAttributes.put(HOSTNAMES[1], host2set);
+    attributesManager.addNodeAttributes(toAddAttributes);
+
+    nodeAttributes = attributesManager.getAttributesForNode(HOSTNAMES[0]);
+    Assert.assertEquals(8, nodeAttributes.size());
+
+    nodeAttributes = attributesManager.getAttributesForNode(HOSTNAMES[1]);
+    Assert.assertEquals(4, nodeAttributes.size());
+
+    allAttributesPerPrefix = attributesManager
+        .getClusterNodeAttributes(Sets.newHashSet(PREFIXES[0]));
+    Assert.assertEquals(3, allAttributesPerPrefix.size());
+    allAttributesPerPrefix = attributesManager
+        .getClusterNodeAttributes(Sets.newHashSet(PREFIXES[1]));
+    Assert.assertEquals(5, allAttributesPerPrefix.size());
+    allAttributesPerPrefix = attributesManager
+        .getClusterNodeAttributes(Sets.newHashSet(PREFIXES[2]));
+    Assert.assertEquals(2, allAttributesPerPrefix.size());
+
+    // Remove "yarn.test1.io/A_2" from host1
+    Set<NodeAttribute> attributes2rm1 = new HashSet<>();
+    attributes2rm1.add(NodeAttribute.newInstance(PREFIXES[0], "A_2",
+        NodeAttributeType.STRING, "anyValue"));
+    toRemoveAttributes.put(HOSTNAMES[0], attributes2rm1);
+    attributesManager.removeNodeAttributes(toRemoveAttributes);
+
+    nodeAttributes = attributesManager.getAttributesForNode(HOSTNAMES[0]);
+    Assert.assertEquals(7, nodeAttributes.size());
+
+    // Remove again, but give a non-exist attribute name
+    attributes2rm1.clear();
+    toRemoveAttributes.clear();
+    attributes2rm1.add(NodeAttribute.newInstance(PREFIXES[0], "non_exist_name",
+        NodeAttributeType.STRING, "anyValue"));
+    toRemoveAttributes.put(HOSTNAMES[0], attributes2rm1);
+    attributesManager.removeNodeAttributes(toRemoveAttributes);
+
+    nodeAttributes = attributesManager.getAttributesForNode(HOSTNAMES[0]);
+    Assert.assertEquals(7, nodeAttributes.size());
+
+    // Remove "yarn.test1.io/A_2" from host2 too,
+    // by then there will be no such attribute exist in the cluster.
+    Set<NodeAttribute> attributes2rm2 = new HashSet<>();
+    attributes2rm2.add(NodeAttribute.newInstance(PREFIXES[0], "A_2",
+        NodeAttributeType.STRING, "anyValue"));
+    toRemoveAttributes.clear();
+    toRemoveAttributes.put(HOSTNAMES[1], attributes2rm2);
+    attributesManager.removeNodeAttributes(toRemoveAttributes);
+
+    // Make sure cluster wide attributes are still consistent.
+    // Since both host1 and host2 doesn't have "yarn.test1.io/A_2",
+    // get all attributes under prefix "yarn.test1.io" should only return
+    // us A_1 and A_3.
+    allAttributesPerPrefix = attributesManager
+        .getClusterNodeAttributes(Sets.newHashSet(PREFIXES[0]));
+    Assert.assertEquals(2, allAttributesPerPrefix.size());
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org