You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by sn...@apache.org on 2019/08/13 13:48:20 UTC

[hadoop] 02/02: YARN-9135. NM State store ResourceMappings serialization are tested with Strings instead of real Device objects. Contributed by Peter Bacsko

This is an automated email from the ASF dual-hosted git repository.

snemeth pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/hadoop.git

commit cb91ab73b088ad68c5757cff3734d2667f5cb71c
Author: Szilard Nemeth <sn...@apache.org>
AuthorDate: Fri Jul 12 17:20:42 2019 +0200

    YARN-9135. NM State store ResourceMappings serialization are tested with Strings instead of real Device objects. Contributed by Peter Bacsko
    
    (cherry picked from commit 8b3c6791b13fc57891cf81e83d4b626b4f2932e6)
---
 .../resources/numa/NumaResourceAllocation.java     | 59 ++++++++++++++--------
 .../resources/numa/NumaResourceAllocator.java      | 34 ++++++++-----
 .../recovery/NMLeveldbStateStoreService.java       |  5 +-
 .../recovery/TestNMLeveldbStateStoreService.java   | 52 +++++++++++--------
 4 files changed, 91 insertions(+), 59 deletions(-)

diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/numa/NumaResourceAllocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/numa/NumaResourceAllocation.java
index f8d4739..e91ac3e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/numa/NumaResourceAllocation.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/numa/NumaResourceAllocation.java
@@ -17,9 +17,11 @@
  */
 package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.numa;
 
+import com.google.common.collect.ImmutableMap;
+
 import java.io.Serializable;
-import java.util.HashMap;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Set;
 
 /**
@@ -28,27 +30,18 @@ import java.util.Set;
  */
 public class NumaResourceAllocation implements Serializable {
   private static final long serialVersionUID = 6339719798446595123L;
-  private Map<String, Long> nodeVsMemory;
-  private Map<String, Integer> nodeVsCpus;
+  private final ImmutableMap<String, Long> nodeVsMemory;
+  private final ImmutableMap<String, Integer> nodeVsCpus;
 
-  public NumaResourceAllocation() {
-    nodeVsMemory = new HashMap<>();
-    nodeVsCpus = new HashMap<>();
+  public NumaResourceAllocation(Map<String, Long> memoryAllocations,
+      Map<String, Integer> cpuAllocations) {
+    nodeVsMemory = ImmutableMap.copyOf(memoryAllocations);
+    nodeVsCpus = ImmutableMap.copyOf(cpuAllocations);
   }
 
   public NumaResourceAllocation(String memNodeId, long memory, String cpuNodeId,
       int cpus) {
-    this();
-    nodeVsMemory.put(memNodeId, memory);
-    nodeVsCpus.put(cpuNodeId, cpus);
-  }
-
-  public void addMemoryNode(String memNodeId, long memory) {
-    nodeVsMemory.put(memNodeId, memory);
-  }
-
-  public void addCpuNode(String cpuNodeId, int cpus) {
-    nodeVsCpus.put(cpuNodeId, cpus);
+    this(ImmutableMap.of(memNodeId, memory), ImmutableMap.of(cpuNodeId, cpus));
   }
 
   public Set<String> getMemNodes() {
@@ -59,11 +52,37 @@ public class NumaResourceAllocation implements Serializable {
     return nodeVsCpus.keySet();
   }
 
-  public Map<String, Long> getNodeVsMemory() {
+  public ImmutableMap<String, Long> getNodeVsMemory() {
     return nodeVsMemory;
   }
 
-  public Map<String, Integer> getNodeVsCpus() {
+  public ImmutableMap<String, Integer> getNodeVsCpus() {
     return nodeVsCpus;
   }
-}
+
+  @Override
+  public String toString() {
+    return "NumaResourceAllocation{" +
+        "nodeVsMemory=" + nodeVsMemory +
+        ", nodeVsCpus=" + nodeVsCpus +
+        '}';
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    NumaResourceAllocation that = (NumaResourceAllocation) o;
+    return Objects.equals(nodeVsMemory, that.nodeVsMemory) &&
+        Objects.equals(nodeVsCpus, that.nodeVsCpus);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(nodeVsMemory, nodeVsCpus);
+  }
+}
\ No newline at end of file
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/numa/NumaResourceAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/numa/NumaResourceAllocator.java
index e152bda..7b49b1a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/numa/NumaResourceAllocator.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/numa/NumaResourceAllocator.java
@@ -29,6 +29,7 @@ import java.util.Map.Entry;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
+import com.google.common.collect.Maps;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -247,17 +248,19 @@ public class NumaResourceAllocator {
 
     // If there is no single node matched for the container resource
     // Check the NUMA nodes for Memory resources
-    NumaResourceAllocation assignedNumaNodeInfo = new NumaResourceAllocation();
-    long memreq = resource.getMemorySize();
+    long memoryRequirement = resource.getMemorySize();
+    Map<String, Long> memoryAllocations = Maps.newHashMap();
     for (NumaNodeResource numaNode : numaNodesList) {
-      long memrem = numaNode.assignAvailableMemory(memreq, containerId);
-      assignedNumaNodeInfo.addMemoryNode(numaNode.getNodeId(), memreq - memrem);
-      memreq = memrem;
-      if (memreq == 0) {
+      long memoryRemaining = numaNode.
+          assignAvailableMemory(memoryRequirement, containerId);
+      memoryAllocations.put(numaNode.getNodeId(),
+          memoryRequirement - memoryRemaining);
+      memoryRequirement = memoryRemaining;
+      if (memoryRequirement == 0) {
         break;
       }
     }
-    if (memreq != 0) {
+    if (memoryRequirement != 0) {
       LOG.info("There is no available memory:" + resource.getMemorySize()
           + " in numa nodes for " + containerId);
       releaseNumaResource(containerId);
@@ -265,26 +268,31 @@ public class NumaResourceAllocator {
     }
 
     // Check the NUMA nodes for CPU resources
-    int cpusreq = resource.getVirtualCores();
+    int cpusRequirement = resource.getVirtualCores();
+    Map<String, Integer> cpuAllocations = Maps.newHashMap();
     for (int index = 0; index < numaNodesList.size(); index++) {
       NumaNodeResource numaNode = numaNodesList
           .get((currentAssignNode + index) % numaNodesList.size());
-      int cpusrem = numaNode.assignAvailableCpus(cpusreq, containerId);
-      assignedNumaNodeInfo.addCpuNode(numaNode.getNodeId(), cpusreq - cpusrem);
-      cpusreq = cpusrem;
-      if (cpusreq == 0) {
+      int cpusRemaining = numaNode.
+          assignAvailableCpus(cpusRequirement, containerId);
+      cpuAllocations.put(numaNode.getNodeId(), cpusRequirement - cpusRemaining);
+      cpusRequirement = cpusRemaining;
+      if (cpusRequirement == 0) {
         currentAssignNode = (currentAssignNode + index + 1)
             % numaNodesList.size();
         break;
       }
     }
 
-    if (cpusreq != 0) {
+    if (cpusRequirement != 0) {
       LOG.info("There are no available cpus:" + resource.getVirtualCores()
           + " in numa nodes for " + containerId);
       releaseNumaResource(containerId);
       return null;
     }
+
+    NumaResourceAllocation assignedNumaNodeInfo =
+        new NumaResourceAllocation(memoryAllocations, cpuAllocations);
     LOG.info("Assigning multiple NUMA nodes ("
         + StringUtils.join(",", assignedNumaNodeInfo.getMemNodes())
         + ") for memory, ("
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java
index 1d7771a..8de94a5 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java
@@ -1459,8 +1459,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
     String keyResChng = CONTAINERS_KEY_PREFIX + container.getContainerId().toString()
         + CONTAINER_ASSIGNED_RESOURCES_KEY_SUFFIX + resourceType;
     try {
-      WriteBatch batch = db.createWriteBatch();
-      try {
+      try (WriteBatch batch = db.createWriteBatch()) {
         ResourceMappings.AssignedResources res =
             new ResourceMappings.AssignedResources();
         res.updateAssignedResources(assignedResources);
@@ -1468,8 +1467,6 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
         // New value will overwrite old values for the same key
         batch.put(bytes(keyResChng), res.toBytes());
         db.write(batch);
-      } finally {
-        batch.close();
       }
     } catch (DBException e) {
       markStoreUnHealthy(e);
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java
index 87208f7..c4c194c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java
@@ -75,6 +75,9 @@ import org.apache.hadoop.yarn.server.api.records.MasterKey;
 import org.apache.hadoop.yarn.server.nodemanager.amrmproxy.AMRMProxyTokenSecretManager;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ResourceMappings;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.fpga.FpgaResourceAllocator.FpgaDevice;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.numa.NumaResourceAllocation;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.gpu.GpuDevice;
 import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.LocalResourceTrackerState;
 import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredAMRMProxyState;
 import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredApplicationsState;
@@ -1448,7 +1451,7 @@ public class TestNMLeveldbStateStoreService {
 
   @Test
   public void testStateStoreForResourceMapping() throws IOException {
-    // test empty when no state
+    // test that stateStore is initially empty
     List<RecoveredContainerState> recoveredContainers =
         loadContainersState(stateStore.getContainerStateIterator());
     assertTrue(recoveredContainers.isEmpty());
@@ -1464,38 +1467,43 @@ public class TestNMLeveldbStateStoreService {
     ResourceMappings resourceMappings = new ResourceMappings();
     when(container.getResourceMappings()).thenReturn(resourceMappings);
 
-    // Store ResourceMapping
     stateStore.storeAssignedResources(container, "gpu",
-        Arrays.asList("1", "2", "3"));
-    // This will overwrite above
-    List<Serializable> gpuRes1 = Arrays.asList("1", "2", "4");
+        Arrays.asList(new GpuDevice(1, 1), new GpuDevice(2, 2),
+            new GpuDevice(3, 3)));
+
+    // This will overwrite the above
+    List<Serializable> gpuRes1 = Arrays.asList(
+        new GpuDevice(1, 1), new GpuDevice(2, 2), new GpuDevice(4, 4));
     stateStore.storeAssignedResources(container, "gpu", gpuRes1);
-    List<Serializable> fpgaRes = Arrays.asList("3", "4", "5", "6");
+
+    List<Serializable> fpgaRes = Arrays.asList(
+        new FpgaDevice("testType", 3, 3, "testIPID"),
+        new FpgaDevice("testType", 4, 4, "testIPID"),
+        new FpgaDevice("testType", 5, 5, "testIPID"),
+        new FpgaDevice("testType", 6, 6, "testIPID"));
     stateStore.storeAssignedResources(container, "fpga", fpgaRes);
-    List<Serializable> numaRes = Arrays.asList("numa1");
+
+    List<Serializable> numaRes = Arrays.asList(
+        new NumaResourceAllocation("testmemNodeId", 2048, "testCpuNodeId", 10));
     stateStore.storeAssignedResources(container, "numa", numaRes);
 
-    // add a invalid key
     restartStateStore();
     recoveredContainers =
         loadContainersState(stateStore.getContainerStateIterator());
     assertEquals(1, recoveredContainers.size());
     RecoveredContainerState rcs = recoveredContainers.get(0);
-    List<Serializable> res = rcs.getResourceMappings()
+    List<Serializable> resources = rcs.getResourceMappings()
         .getAssignedResources("gpu");
-    Assert.assertTrue(res.equals(gpuRes1));
-    Assert.assertTrue(
-        resourceMappings.getAssignedResources("gpu").equals(gpuRes1));
-
-    res = rcs.getResourceMappings().getAssignedResources("fpga");
-    Assert.assertTrue(res.equals(fpgaRes));
-    Assert.assertTrue(
-        resourceMappings.getAssignedResources("fpga").equals(fpgaRes));
-
-    res = rcs.getResourceMappings().getAssignedResources("numa");
-    Assert.assertTrue(res.equals(numaRes));
-    Assert.assertTrue(
-        resourceMappings.getAssignedResources("numa").equals(numaRes));
+    Assert.assertEquals(gpuRes1, resources);
+    Assert.assertEquals(gpuRes1, resourceMappings.getAssignedResources("gpu"));
+
+    resources = rcs.getResourceMappings().getAssignedResources("fpga");
+    Assert.assertEquals(fpgaRes, resources);
+    Assert.assertEquals(fpgaRes, resourceMappings.getAssignedResources("fpga"));
+
+    resources = rcs.getResourceMappings().getAssignedResources("numa");
+    Assert.assertEquals(numaRes, resources);
+    Assert.assertEquals(numaRes, resourceMappings.getAssignedResources("numa"));
   }
 
   @Test


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org