You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by sn...@apache.org on 2019/08/13 13:48:19 UTC
[hadoop] 01/02: Revert "YARN-9135. NM State store ResourceMappings
serialization are tested with Strings instead of real Device objects.
Contributed by Peter Bacsko"
This is an automated email from the ASF dual-hosted git repository.
snemeth pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/hadoop.git
commit a762a6be2943ec54f72b294678d93fee6dbd8921
Author: Szilard Nemeth <sn...@apache.org>
AuthorDate: Tue Aug 13 15:44:50 2019 +0200
Revert "YARN-9135. NM State store ResourceMappings serialization are tested with Strings instead of real Device objects. Contributed by Peter Bacsko"
This reverts commit b20fd9e21295add7e80f07b471bba5c76e433aed.
Commit is reverted since unnecessary files were added, accidentally.
---
.../resources/numa/NumaResourceAllocation.java | 59 ++++++++--------------
.../resources/numa/NumaResourceAllocator.java | 34 +++++--------
.../recovery/NMLeveldbStateStoreService.java | 5 +-
.../recovery/TestNMLeveldbStateStoreService.java | 52 ++++++++-----------
4 files changed, 59 insertions(+), 91 deletions(-)
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/numa/NumaResourceAllocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/numa/NumaResourceAllocation.java
index e91ac3e..f8d4739 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/numa/NumaResourceAllocation.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/numa/NumaResourceAllocation.java
@@ -17,11 +17,9 @@
*/
package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.numa;
-import com.google.common.collect.ImmutableMap;
-
import java.io.Serializable;
+import java.util.HashMap;
import java.util.Map;
-import java.util.Objects;
import java.util.Set;
/**
@@ -30,18 +28,27 @@ import java.util.Set;
*/
public class NumaResourceAllocation implements Serializable {
private static final long serialVersionUID = 6339719798446595123L;
- private final ImmutableMap<String, Long> nodeVsMemory;
- private final ImmutableMap<String, Integer> nodeVsCpus;
+ private Map<String, Long> nodeVsMemory;
+ private Map<String, Integer> nodeVsCpus;
- public NumaResourceAllocation(Map<String, Long> memoryAllocations,
- Map<String, Integer> cpuAllocations) {
- nodeVsMemory = ImmutableMap.copyOf(memoryAllocations);
- nodeVsCpus = ImmutableMap.copyOf(cpuAllocations);
+ public NumaResourceAllocation() {
+ nodeVsMemory = new HashMap<>();
+ nodeVsCpus = new HashMap<>();
}
public NumaResourceAllocation(String memNodeId, long memory, String cpuNodeId,
int cpus) {
- this(ImmutableMap.of(memNodeId, memory), ImmutableMap.of(cpuNodeId, cpus));
+ this();
+ nodeVsMemory.put(memNodeId, memory);
+ nodeVsCpus.put(cpuNodeId, cpus);
+ }
+
+ public void addMemoryNode(String memNodeId, long memory) {
+ nodeVsMemory.put(memNodeId, memory);
+ }
+
+ public void addCpuNode(String cpuNodeId, int cpus) {
+ nodeVsCpus.put(cpuNodeId, cpus);
}
public Set<String> getMemNodes() {
@@ -52,37 +59,11 @@ public class NumaResourceAllocation implements Serializable {
return nodeVsCpus.keySet();
}
- public ImmutableMap<String, Long> getNodeVsMemory() {
+ public Map<String, Long> getNodeVsMemory() {
return nodeVsMemory;
}
- public ImmutableMap<String, Integer> getNodeVsCpus() {
+ public Map<String, Integer> getNodeVsCpus() {
return nodeVsCpus;
}
-
- @Override
- public String toString() {
- return "NumaResourceAllocation{" +
- "nodeVsMemory=" + nodeVsMemory +
- ", nodeVsCpus=" + nodeVsCpus +
- '}';
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
- NumaResourceAllocation that = (NumaResourceAllocation) o;
- return Objects.equals(nodeVsMemory, that.nodeVsMemory) &&
- Objects.equals(nodeVsCpus, that.nodeVsCpus);
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(nodeVsMemory, nodeVsCpus);
- }
-}
\ No newline at end of file
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/numa/NumaResourceAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/numa/NumaResourceAllocator.java
index f95e55e..e152bda 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/numa/NumaResourceAllocator.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/numa/NumaResourceAllocator.java
@@ -31,7 +31,6 @@ import java.util.regex.Pattern;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import com.google.common.collect.Maps;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.Shell.ShellCommandExecutor;
import org.apache.hadoop.util.StringUtils;
@@ -248,19 +247,17 @@ public class NumaResourceAllocator {
// If there is no single node matched for the container resource
// Check the NUMA nodes for Memory resources
- long memoryRequirement = resource.getMemorySize();
- Map<String, Long> memoryAllocations = Maps.newHashMap();
+ NumaResourceAllocation assignedNumaNodeInfo = new NumaResourceAllocation();
+ long memreq = resource.getMemorySize();
for (NumaNodeResource numaNode : numaNodesList) {
- long memoryRemaining = numaNode.
- assignAvailableMemory(memoryRequirement, containerId);
- memoryAllocations.put(numaNode.getNodeId(),
- memoryRequirement - memoryRemaining);
- memoryRequirement = memoryRemaining;
- if (memoryRequirement == 0) {
+ long memrem = numaNode.assignAvailableMemory(memreq, containerId);
+ assignedNumaNodeInfo.addMemoryNode(numaNode.getNodeId(), memreq - memrem);
+ memreq = memrem;
+ if (memreq == 0) {
break;
}
}
- if (memoryRequirement != 0) {
+ if (memreq != 0) {
LOG.info("There is no available memory:" + resource.getMemorySize()
+ " in numa nodes for " + containerId);
releaseNumaResource(containerId);
@@ -268,31 +265,26 @@ public class NumaResourceAllocator {
}
// Check the NUMA nodes for CPU resources
- int cpusRequirement = resource.getVirtualCores();
- Map<String, Integer> cpuAllocations = Maps.newHashMap();
+ int cpusreq = resource.getVirtualCores();
for (int index = 0; index < numaNodesList.size(); index++) {
NumaNodeResource numaNode = numaNodesList
.get((currentAssignNode + index) % numaNodesList.size());
- int cpusRemaining = numaNode.
- assignAvailableCpus(cpusRequirement, containerId);
- cpuAllocations.put(numaNode.getNodeId(), cpusRequirement - cpusRemaining);
- cpusRequirement = cpusRemaining;
- if (cpusRequirement == 0) {
+ int cpusrem = numaNode.assignAvailableCpus(cpusreq, containerId);
+ assignedNumaNodeInfo.addCpuNode(numaNode.getNodeId(), cpusreq - cpusrem);
+ cpusreq = cpusrem;
+ if (cpusreq == 0) {
currentAssignNode = (currentAssignNode + index + 1)
% numaNodesList.size();
break;
}
}
- if (cpusRequirement != 0) {
+ if (cpusreq != 0) {
LOG.info("There are no available cpus:" + resource.getVirtualCores()
+ " in numa nodes for " + containerId);
releaseNumaResource(containerId);
return null;
}
-
- NumaResourceAllocation assignedNumaNodeInfo =
- new NumaResourceAllocation(memoryAllocations, cpuAllocations);
LOG.info("Assigning multiple NUMA nodes ("
+ StringUtils.join(",", assignedNumaNodeInfo.getMemNodes())
+ ") for memory, ("
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java
index 8de94a5..1d7771a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java
@@ -1459,7 +1459,8 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
String keyResChng = CONTAINERS_KEY_PREFIX + container.getContainerId().toString()
+ CONTAINER_ASSIGNED_RESOURCES_KEY_SUFFIX + resourceType;
try {
- try (WriteBatch batch = db.createWriteBatch()) {
+ WriteBatch batch = db.createWriteBatch();
+ try {
ResourceMappings.AssignedResources res =
new ResourceMappings.AssignedResources();
res.updateAssignedResources(assignedResources);
@@ -1467,6 +1468,8 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
// New value will overwrite old values for the same key
batch.put(bytes(keyResChng), res.toBytes());
db.write(batch);
+ } finally {
+ batch.close();
}
} catch (DBException e) {
markStoreUnHealthy(e);
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java
index c4c194c..87208f7 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java
@@ -75,9 +75,6 @@ import org.apache.hadoop.yarn.server.api.records.MasterKey;
import org.apache.hadoop.yarn.server.nodemanager.amrmproxy.AMRMProxyTokenSecretManager;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ResourceMappings;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.fpga.FpgaResourceAllocator.FpgaDevice;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.numa.NumaResourceAllocation;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.gpu.GpuDevice;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.LocalResourceTrackerState;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredAMRMProxyState;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredApplicationsState;
@@ -1451,7 +1448,7 @@ public class TestNMLeveldbStateStoreService {
@Test
public void testStateStoreForResourceMapping() throws IOException {
- // test that stateStore is initially empty
+ // test empty when no state
List<RecoveredContainerState> recoveredContainers =
loadContainersState(stateStore.getContainerStateIterator());
assertTrue(recoveredContainers.isEmpty());
@@ -1467,43 +1464,38 @@ public class TestNMLeveldbStateStoreService {
ResourceMappings resourceMappings = new ResourceMappings();
when(container.getResourceMappings()).thenReturn(resourceMappings);
+ // Store ResourceMapping
stateStore.storeAssignedResources(container, "gpu",
- Arrays.asList(new GpuDevice(1, 1), new GpuDevice(2, 2),
- new GpuDevice(3, 3)));
-
- // This will overwrite the above
- List<Serializable> gpuRes1 = Arrays.asList(
- new GpuDevice(1, 1), new GpuDevice(2, 2), new GpuDevice(4, 4));
+ Arrays.asList("1", "2", "3"));
+ // This will overwrite above
+ List<Serializable> gpuRes1 = Arrays.asList("1", "2", "4");
stateStore.storeAssignedResources(container, "gpu", gpuRes1);
-
- List<Serializable> fpgaRes = Arrays.asList(
- new FpgaDevice("testType", 3, 3, "testIPID"),
- new FpgaDevice("testType", 4, 4, "testIPID"),
- new FpgaDevice("testType", 5, 5, "testIPID"),
- new FpgaDevice("testType", 6, 6, "testIPID"));
+ List<Serializable> fpgaRes = Arrays.asList("3", "4", "5", "6");
stateStore.storeAssignedResources(container, "fpga", fpgaRes);
-
- List<Serializable> numaRes = Arrays.asList(
- new NumaResourceAllocation("testmemNodeId", 2048, "testCpuNodeId", 10));
+ List<Serializable> numaRes = Arrays.asList("numa1");
stateStore.storeAssignedResources(container, "numa", numaRes);
+ // add a invalid key
restartStateStore();
recoveredContainers =
loadContainersState(stateStore.getContainerStateIterator());
assertEquals(1, recoveredContainers.size());
RecoveredContainerState rcs = recoveredContainers.get(0);
- List<Serializable> resources = rcs.getResourceMappings()
+ List<Serializable> res = rcs.getResourceMappings()
.getAssignedResources("gpu");
- Assert.assertEquals(gpuRes1, resources);
- Assert.assertEquals(gpuRes1, resourceMappings.getAssignedResources("gpu"));
-
- resources = rcs.getResourceMappings().getAssignedResources("fpga");
- Assert.assertEquals(fpgaRes, resources);
- Assert.assertEquals(fpgaRes, resourceMappings.getAssignedResources("fpga"));
-
- resources = rcs.getResourceMappings().getAssignedResources("numa");
- Assert.assertEquals(numaRes, resources);
- Assert.assertEquals(numaRes, resourceMappings.getAssignedResources("numa"));
+ Assert.assertTrue(res.equals(gpuRes1));
+ Assert.assertTrue(
+ resourceMappings.getAssignedResources("gpu").equals(gpuRes1));
+
+ res = rcs.getResourceMappings().getAssignedResources("fpga");
+ Assert.assertTrue(res.equals(fpgaRes));
+ Assert.assertTrue(
+ resourceMappings.getAssignedResources("fpga").equals(fpgaRes));
+
+ res = rcs.getResourceMappings().getAssignedResources("numa");
+ Assert.assertTrue(res.equals(numaRes));
+ Assert.assertTrue(
+ resourceMappings.getAssignedResources("numa").equals(numaRes));
}
@Test
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org