You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by na...@apache.org on 2020/06/13 16:54:54 UTC
[hadoop-ozone] branch master updated: HDDS-3778. Block distribution
in a pipeline among open containers is not uniform. (#1058)
This is an automated email from the ASF dual-hosted git repository.
nanda pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hadoop-ozone.git
The following commit(s) were added to refs/heads/master by this push:
new 563507c HDDS-3778. Block distribution in a pipeline among open containers is not uniform. (#1058)
563507c is described below
commit 563507cf2501286b2b3ed6d9e816ad7d9bcd46f4
Author: bshashikant <sh...@apache.org>
AuthorDate: Sat Jun 13 22:24:46 2020 +0530
HDDS-3778. Block distribution in a pipeline among open containers is not uniform. (#1058)
---
.../hdds/scm/container/SCMContainerManager.java | 49 +++++++++--------
.../hadoop/hdds/scm/block/TestBlockManager.java | 63 +++++++++++++++++++++-
2 files changed, 85 insertions(+), 27 deletions(-)
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/SCMContainerManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/SCMContainerManager.java
index 9f47608..7ac90fc 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/SCMContainerManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/SCMContainerManager.java
@@ -422,44 +422,43 @@ public class SCMContainerManager implements ContainerManager {
@SuppressWarnings("squid:S2445")
public ContainerInfo getMatchingContainer(final long sizeRequired,
- String owner, Pipeline pipeline, List<ContainerID> excludedContainers) {
+ String owner, Pipeline pipeline,
+ List<ContainerID>
+ excludedContainers) {
NavigableSet<ContainerID> containerIDs;
+ ContainerInfo containerInfo;
try {
synchronized (pipeline) {
containerIDs = getContainersForOwner(pipeline, owner);
if (containerIDs.size() < numContainerPerOwnerInPipeline) {
- ContainerInfo containerInfo =
- containerStateManager.allocateContainer(pipelineManager, owner,
- pipeline);
- // Add to DB
- addContainerToDB(containerInfo);
- containerStateManager.updateLastUsedMap(pipeline.getId(),
- containerInfo.containerID(), owner);
- return containerInfo;
- }
- }
-
- containerIDs.removeAll(excludedContainers);
- ContainerInfo containerInfo =
- containerStateManager.getMatchingContainer(sizeRequired, owner,
- pipeline.getId(), containerIDs);
- if (containerInfo == null) {
- synchronized (pipeline) {
containerInfo =
- containerStateManager.allocateContainer(pipelineManager, owner,
- pipeline);
+ containerStateManager.allocateContainer(
+ pipelineManager, owner, pipeline);
// Add to DB
addContainerToDB(containerInfo);
+ } else {
+ containerIDs.removeAll(excludedContainers);
+ containerInfo =
+ containerStateManager.getMatchingContainer(
+ sizeRequired, owner, pipeline.getId(), containerIDs);
+ if (containerInfo == null) {
+ containerInfo =
+ containerStateManager.
+ allocateContainer(pipelineManager, owner,
+ pipeline);
+ // Add to DB
+ addContainerToDB(containerInfo);
+ }
}
+ containerStateManager.updateLastUsedMap(pipeline.getId(),
+ containerInfo.containerID(), owner);
+ // TODO: #CLUTIL cleanup entries in lastUsedMap
+ return containerInfo;
}
- containerStateManager.updateLastUsedMap(pipeline.getId(),
- containerInfo.containerID(), owner);
- // TODO: #CLUTIL cleanup entries in lastUsedMap
- return containerInfo;
} catch (Exception e) {
LOG.warn("Container allocation failed for pipeline={} requiredSize={} {}",
- pipeline, sizeRequired, e);
+ pipeline, sizeRequired, e);
return null;
}
}
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestBlockManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestBlockManager.java
index 955efd3..c2ad264 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestBlockManager.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestBlockManager.java
@@ -18,13 +18,15 @@
package org.apache.hadoop.hdds.scm.block;
import java.io.IOException;
-import java.util.ArrayList;
import java.util.List;
+import java.util.Map;
+import java.util.ArrayList;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
import org.apache.hadoop.hdds.HddsConfigKeys;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
@@ -238,6 +240,63 @@ public class TestBlockManager {
}
@Test
+ public void testBlockDistribution() throws Exception {
+ int threadCount = numContainerPerOwnerInPipeline *
+ numContainerPerOwnerInPipeline;
+ List<ExecutorService> executors = new ArrayList<>(threadCount);
+ for (int i = 0; i < threadCount; i++) {
+ executors.add(Executors.newSingleThreadExecutor());
+ }
+ pipelineManager.createPipeline(type, factor);
+ TestUtils.openAllRatisPipelines(pipelineManager);
+ Map<Long, List<AllocatedBlock>> allocatedBlockMap =
+ new ConcurrentHashMap<>();
+ List<CompletableFuture<AllocatedBlock>> futureList =
+ new ArrayList<>(threadCount);
+ for (int i = 0; i < threadCount; i++) {
+ final CompletableFuture<AllocatedBlock> future =
+ new CompletableFuture<>();
+ CompletableFuture.supplyAsync(() -> {
+ try {
+ List<AllocatedBlock> blockList;
+ AllocatedBlock block = blockManager
+ .allocateBlock(DEFAULT_BLOCK_SIZE, type, factor,
+ OzoneConsts.OZONE,
+ new ExcludeList());
+ long containerId = block.getBlockID().getContainerID();
+ if (!allocatedBlockMap.containsKey(containerId)) {
+ blockList = new ArrayList<>();
+ } else {
+ blockList = allocatedBlockMap.get(containerId);
+ }
+ blockList.add(block);
+ allocatedBlockMap.put(containerId, blockList);
+ future.complete(block);
+ } catch (IOException e) {
+ future.completeExceptionally(e);
+ }
+ return future;
+ }, executors.get(i));
+ futureList.add(future);
+ }
+ try {
+ CompletableFuture
+ .allOf(futureList.toArray(
+ new CompletableFuture[futureList.size()])).get();
+ Assert.assertTrue(pipelineManager.getPipelines(type).size() == 1);
+ Assert.assertTrue(
+ allocatedBlockMap.size() == numContainerPerOwnerInPipeline);
+ Assert.assertTrue(allocatedBlockMap.
+ values().size() == numContainerPerOwnerInPipeline);
+ allocatedBlockMap.values().stream().forEach(v -> {
+ Assert.assertTrue(v.size() == numContainerPerOwnerInPipeline);
+ });
+ } catch (Exception e) {
+ Assert.fail("testAllocateBlockInParallel failed");
+ }
+ }
+
+ @Test
public void testAllocateOversizedBlock() throws Exception {
long size = 6 * GB;
thrown.expectMessage("Unsupported block size");
---------------------------------------------------------------------
To unsubscribe, e-mail: ozone-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: ozone-commits-help@hadoop.apache.org