You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by ad...@apache.org on 2020/07/29 15:50:51 UTC
[hadoop-ozone] branch master updated: HDDS-3809. Make number of
open containers on a datanode a function of no of volumes reported by it.
(#1081)
This is an automated email from the ASF dual-hosted git repository.
adoroszlai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hadoop-ozone.git
The following commit(s) were added to refs/heads/master by this push:
new 1346f49 HDDS-3809. Make number of open containers on a datanode a function of no of volumes reported by it. (#1081)
1346f49 is described below
commit 1346f493fa1690358add7bb9f3e5b52545993f36
Author: bshashikant <sh...@apache.org>
AuthorDate: Wed Jul 29 21:20:34 2020 +0530
HDDS-3809. Make number of open containers on a datanode a function of no of volumes reported by it. (#1081)
---
.../common/src/main/resources/ozone-default.xml | 2 +-
.../hdds/scm/container/SCMContainerManager.java | 7 ++-
.../apache/hadoop/hdds/scm/node/DatanodeInfo.java | 22 ++++++++
.../apache/hadoop/hdds/scm/node/NodeManager.java | 2 +
.../hadoop/hdds/scm/node/SCMNodeManager.java | 21 +++++++
.../hadoop/hdds/scm/pipeline/PipelineManager.java | 2 +
.../hdds/scm/pipeline/SCMPipelineManager.java | 10 ++++
.../java/org/apache/hadoop/hdds/scm/TestUtils.java | 33 ++++++-----
.../hadoop/hdds/scm/block/TestBlockManager.java | 66 ++++++++++++++++++++++
.../hadoop/hdds/scm/container/MockNodeManager.java | 11 ++++
.../hadoop/hdds/scm/node/TestSCMNodeManager.java | 51 ++++++++++++++++-
.../testutils/ReplicationNodeManagerMock.java | 5 ++
12 files changed, 214 insertions(+), 18 deletions(-)
diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml b/hadoop-hdds/common/src/main/resources/ozone-default.xml
index a07807b..b9774aa 100644
--- a/hadoop-hdds/common/src/main/resources/ozone-default.xml
+++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml
@@ -818,7 +818,7 @@
<name>ozone.scm.pipeline.owner.container.count</name>
<value>3</value>
<tag>OZONE, SCM, PIPELINE</tag>
- <description>Number of containers per owner in a pipeline.
+ <description>Number of containers per owner per disk in a pipeline.
</description>
</property>
<property>
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/SCMContainerManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/SCMContainerManager.java
index 34177f0..e09486e 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/SCMContainerManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/SCMContainerManager.java
@@ -66,7 +66,7 @@ public class SCMContainerManager implements ContainerManager {
private final ContainerStateManager containerStateManager;
- private final int numContainerPerOwnerInPipeline;
+ private final int numContainerPerVolume;
private final SCMContainerManagerMetrics scmContainerManagerMetrics;
@@ -98,7 +98,7 @@ public class SCMContainerManager implements ContainerManager {
this.lock = new ReentrantLock();
this.pipelineManager = pipelineManager;
this.containerStateManager = new ContainerStateManager(conf);
- this.numContainerPerOwnerInPipeline = conf
+ this.numContainerPerVolume = conf
.getInt(ScmConfigKeys.OZONE_SCM_PIPELINE_OWNER_CONTAINER_COUNT,
ScmConfigKeys.OZONE_SCM_PIPELINE_OWNER_CONTAINER_COUNT_DEFAULT);
@@ -432,7 +432,8 @@ public class SCMContainerManager implements ContainerManager {
synchronized (pipeline) {
containerIDs = getContainersForOwner(pipeline, owner);
- if (containerIDs.size() < numContainerPerOwnerInPipeline) {
+ if (containerIDs.size() < numContainerPerVolume * pipelineManager.
+ getNumHealthyVolumes(pipeline)) {
containerInfo =
containerStateManager.allocateContainer(
pipelineManager, owner, pipeline);
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DatanodeInfo.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DatanodeInfo.java
index d06ea2a..b39440f 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DatanodeInfo.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DatanodeInfo.java
@@ -109,6 +109,28 @@ public class DatanodeInfo extends DatanodeDetails {
}
/**
+ * Returns count of healthy volumes reported from datanode.
+ * @return count of healthy volumes
+ */
+ public int getHealthyVolumeCount() {
+ try {
+ lock.readLock().lock();
+ return storageReports.size() - getFailedVolumeCount();
+ } finally {
+ lock.readLock().unlock();
+ }
+ }
+
+ /**
+ * Returns count of failed volumes reported from datanode.
+ * @return count of failed volumes
+ */
+ private int getFailedVolumeCount() {
+ return (int) storageReports.stream().
+ filter(e -> e.hasFailed() ? e.getFailed() : false).count();
+ }
+
+ /**
* Returns the last updated time of datanode info.
* @return the last updated time of datanode info.
*/
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java
index 37562fe..df21b84 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java
@@ -213,4 +213,6 @@ public interface NodeManager extends StorageContainerNodeProtocol,
* @return cluster map
*/
NetworkTopology getClusterNetworkTopologyMap();
+
+ int getNumHealthyVolumes(List <DatanodeDetails> dnList);
}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
index 005881c..1a0cec3 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
@@ -27,6 +27,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
+import java.util.Collections;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledFuture;
import java.util.stream.Collectors;
@@ -510,6 +511,26 @@ public class SCMNodeManager implements NodeManager {
}
/**
+ * Returns the max of no healthy volumes reported out of the set
+ * of datanodes constituting the pipeline.
+ */
+ @Override
+ public int getNumHealthyVolumes(List<DatanodeDetails> dnList) {
+ List<Integer> volumeCountList = new ArrayList<>(dnList.size());
+ for (DatanodeDetails dn : dnList) {
+ try {
+ volumeCountList.add(nodeStateManager.getNode(dn).
+ getHealthyVolumeCount());
+ } catch (NodeNotFoundException e) {
+ LOG.warn("Cannot generate NodeStat, datanode {} not found.",
+ dn.getUuid());
+ }
+ }
+ Preconditions.checkArgument(!volumeCountList.isEmpty());
+ return Collections.max(volumeCountList);
+ }
+
+ /**
* Get set of pipelines a datanode is part of.
*
* @param datanodeDetails - datanodeID
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java
index 48068d8..857f76e 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java
@@ -89,6 +89,8 @@ public interface PipelineManager extends Closeable, PipelineManagerMXBean,
void incNumBlocksAllocatedMetric(PipelineID id);
+ int getNumHealthyVolumes(Pipeline pipeline);
+
/**
* Activates a dormant pipeline.
*
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java
index 6fce895..e754059 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java
@@ -661,6 +661,16 @@ public class SCMPipelineManager implements PipelineManager {
pipelineFactory.shutdown();
}
+ /**
+ * returns max count of healthy volumes from the set of
+ * datanodes constituting the pipeline.
+ * @param pipeline
+ * @return healthy volume count
+ */
+ public int getNumHealthyVolumes(Pipeline pipeline) {
+ return nodeManager.getNumHealthyVolumes(pipeline.getNodes());
+ }
+
protected ReadWriteLock getLock() {
return lock;
}
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestUtils.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestUtils.java
index 03ed0f7..f4f1759 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestUtils.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestUtils.java
@@ -222,20 +222,26 @@ public final class TestUtils {
StorageTypeProto.DISK);
}
- /**
- * Creates storage report with the given information.
- *
- * @param nodeId datanode id
- * @param path storage dir
- * @param capacity storage size
- * @param used space used
- * @param remaining space remaining
- * @param type type of storage
- *
- * @return StorageReportProto
- */
public static StorageReportProto createStorageReport(UUID nodeId, String path,
- long capacity, long used, long remaining, StorageTypeProto type) {
+ long capacity, long used, long remaining, StorageTypeProto type) {
+ return createStorageReport(nodeId, path, capacity, used, remaining,
+ type, false);
+ }
+ /**
+ * Creates storage report with the given information.
+ *
+ * @param nodeId datanode id
+ * @param path storage dir
+ * @param capacity storage size
+ * @param used space used
+ * @param remaining space remaining
+ * @param type type of storage
+ *
+ * @return StorageReportProto
+ */
+ public static StorageReportProto createStorageReport(UUID nodeId, String path,
+ long capacity, long used, long remaining, StorageTypeProto type,
+ boolean failed) {
Preconditions.checkNotNull(nodeId);
Preconditions.checkNotNull(path);
StorageReportProto.Builder srb = StorageReportProto.newBuilder();
@@ -243,6 +249,7 @@ public final class TestUtils {
.setStorageLocation(path)
.setCapacity(capacity)
.setScmUsed(used)
+ .setFailed(failed)
.setRemaining(remaining);
StorageTypeProto storageTypeProto =
type == null ? StorageTypeProto.DISK : type;
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestBlockManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestBlockManager.java
index e0ba53c..a72031c 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestBlockManager.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestBlockManager.java
@@ -298,6 +298,72 @@ public class TestBlockManager {
}
}
+
+ @Test
+ public void testBlockDistributionWithMultipleDisks() throws Exception {
+ int threadCount = numContainerPerOwnerInPipeline *
+ numContainerPerOwnerInPipeline;
+ nodeManager.setNumHealthyVolumes(numContainerPerOwnerInPipeline);
+ List<ExecutorService> executors = new ArrayList<>(threadCount);
+ for (int i = 0; i < threadCount; i++) {
+ executors.add(Executors.newSingleThreadExecutor());
+ }
+ pipelineManager.createPipeline(type, factor);
+ TestUtils.openAllRatisPipelines(pipelineManager);
+ Map<Long, List<AllocatedBlock>> allocatedBlockMap =
+ new ConcurrentHashMap<>();
+ List<CompletableFuture<AllocatedBlock>> futureList =
+ new ArrayList<>(threadCount);
+ for (int i = 0; i < threadCount; i++) {
+ final CompletableFuture<AllocatedBlock> future =
+ new CompletableFuture<>();
+ CompletableFuture.supplyAsync(() -> {
+ try {
+ List<AllocatedBlock> blockList;
+ AllocatedBlock block = blockManager
+ .allocateBlock(DEFAULT_BLOCK_SIZE, type, factor,
+ OzoneConsts.OZONE,
+ new ExcludeList());
+ long containerId = block.getBlockID().getContainerID();
+ if (!allocatedBlockMap.containsKey(containerId)) {
+ blockList = new ArrayList<>();
+ } else {
+ blockList = allocatedBlockMap.get(containerId);
+ }
+ blockList.add(block);
+ allocatedBlockMap.put(containerId, blockList);
+ future.complete(block);
+ } catch (IOException e) {
+ future.completeExceptionally(e);
+ }
+ return future;
+ }, executors.get(i));
+ futureList.add(future);
+ }
+ try {
+ CompletableFuture
+ .allOf(futureList.toArray(
+ new CompletableFuture[futureList.size()])).get();
+ Assert.assertTrue(
+ pipelineManager.getPipelines(type).size() == 1);
+ Pipeline pipeline = pipelineManager.getPipelines(type).get(0);
+ // total no of containers to be created will be number of healthy
+ // volumes * number of numContainerPerOwnerInPipeline which is equal to
+ // the thread count
+ Assert.assertTrue(threadCount == pipelineManager.
+ getNumberOfContainers(pipeline.getId()));
+ Assert.assertTrue(
+ allocatedBlockMap.size() == threadCount);
+ Assert.assertTrue(allocatedBlockMap.
+ values().size() == threadCount);
+ allocatedBlockMap.values().stream().forEach(v -> {
+ Assert.assertTrue(v.size() == 1);
+ });
+ } catch (Exception e) {
+ Assert.fail("testAllocateBlockInParallel failed");
+ }
+ }
+
@Test
public void testAllocateOversizedBlock() throws Exception {
long size = 6 * GB;
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java
index 54f6ee4..5b635a7 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java
@@ -92,6 +92,7 @@ public class MockNodeManager implements NodeManager {
private final Node2ContainerMap node2ContainerMap;
private NetworkTopology clusterMap;
private ConcurrentMap<String, Set<String>> dnsToUuidMap;
+ private int numHealthyDisksPerDatanode;
public MockNodeManager(NetworkTopologyImpl clusterMap,
List<DatanodeDetails> nodes,
@@ -121,6 +122,7 @@ public class MockNodeManager implements NodeManager {
}
safemode = false;
this.commandMap = new HashMap<>();
+ numHealthyDisksPerDatanode = 1;
}
public MockNodeManager(boolean initializeFakeNodes, int nodeCount) {
@@ -569,6 +571,15 @@ public class MockNodeManager implements NodeManager {
this.clusterMap = topology;
}
+ @Override
+ public int getNumHealthyVolumes(List<DatanodeDetails> dnList) {
+ return numHealthyDisksPerDatanode;
+ }
+
+ public void setNumHealthyVolumes(int value) {
+ numHealthyDisksPerDatanode = value;
+ }
+
/**
* A class to declare some values for the nodes so that our tests
* won't fail.
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java
index df5cb2d..7a58d46 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java
@@ -848,11 +848,12 @@ public class TestSCMNodeManager {
final long capacity = 2000;
final long used = 100;
final long remaining = capacity - used;
-
+ List<DatanodeDetails> dnList = new ArrayList<>(nodeCount);
try (SCMNodeManager nodeManager = createNodeManager(conf)) {
EventQueue eventQueue = (EventQueue) scm.getEventQueue();
for (int x = 0; x < nodeCount; x++) {
DatanodeDetails dn = MockDatanodeDetails.randomDatanodeDetails();
+ dnList.add(dn);
UUID dnId = dn.getUuid();
long free = capacity - used;
String storagePath = testDir.getAbsolutePath() + "/" + dnId;
@@ -871,9 +872,57 @@ public class TestSCMNodeManager {
.getScmUsed().get());
assertEquals(remaining * nodeCount, (long) nodeManager.getStats()
.getRemaining().get());
+ assertEquals(1, nodeManager.getNumHealthyVolumes(dnList));
+ dnList.clear();
+ }
+ }
+
+ /**
+ * Test multiple nodes sending initial heartbeat with their node report
+ * with multiple volumes.
+ *
+ * @throws IOException
+ * @throws InterruptedException
+ * @throws TimeoutException
+ */
+ @Test
+ public void tesVolumeInfoFromNodeReport()
+ throws IOException, InterruptedException, AuthenticationException {
+ OzoneConfiguration conf = getConf();
+ conf.setTimeDuration(OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL, 1000,
+ MILLISECONDS);
+ final int volumeCount = 10;
+ final long capacity = 2000;
+ final long used = 100;
+ List<DatanodeDetails> dnList = new ArrayList<>(1);
+ try (SCMNodeManager nodeManager = createNodeManager(conf)) {
+ EventQueue eventQueue = (EventQueue) scm.getEventQueue();
+ DatanodeDetails dn = MockDatanodeDetails.randomDatanodeDetails();
+ dnList.add(dn);
+ UUID dnId = dn.getUuid();
+ long free = capacity - used;
+ List<StorageReportProto> reports = new ArrayList<>(volumeCount);
+ boolean failed = true;
+ for (int x = 0; x < volumeCount; x++) {
+ String storagePath = testDir.getAbsolutePath() + "/" + dnId;
+ reports.add(TestUtils
+ .createStorageReport(dnId, storagePath, capacity,
+ used, free, null, failed));
+ failed = !failed;
+ }
+ nodeManager.register(dn, TestUtils.createNodeReport(reports), null);
+ nodeManager.processHeartbeat(dn);
+ //TODO: wait for EventQueue to be processed
+ eventQueue.processAll(8000L);
+
+ assertEquals(1, nodeManager.getNodeCount(HEALTHY));
+ assertEquals(volumeCount / 2,
+ nodeManager.getNumHealthyVolumes(dnList));
+ dnList.clear();
}
}
+
/**
* Test single node stat update based on nodereport from different heartbeat
* status (healthy, stale and dead).
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java
index 9ca3f18..a9b879f 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java
@@ -343,4 +343,9 @@ public class ReplicationNodeManagerMock implements NodeManager {
public NetworkTopology getClusterNetworkTopologyMap() {
return null;
}
+
+ @Override
+ public int getNumHealthyVolumes(List<DatanodeDetails> dnList) {
+ return 0;
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: ozone-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: ozone-commits-help@hadoop.apache.org