You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by ad...@apache.org on 2020/07/29 15:50:51 UTC

[hadoop-ozone] branch master updated: HDDS-3809. Make number of open containers on a datanode a function of no of volumes reported by it. (#1081)

This is an automated email from the ASF dual-hosted git repository.

adoroszlai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hadoop-ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new 1346f49  HDDS-3809. Make number of open containers on a datanode a function of no of volumes reported by it. (#1081)
1346f49 is described below

commit 1346f493fa1690358add7bb9f3e5b52545993f36
Author: bshashikant <sh...@apache.org>
AuthorDate: Wed Jul 29 21:20:34 2020 +0530

    HDDS-3809. Make number of open containers on a datanode a function of no of volumes reported by it. (#1081)
---
 .../common/src/main/resources/ozone-default.xml    |  2 +-
 .../hdds/scm/container/SCMContainerManager.java    |  7 ++-
 .../apache/hadoop/hdds/scm/node/DatanodeInfo.java  | 22 ++++++++
 .../apache/hadoop/hdds/scm/node/NodeManager.java   |  2 +
 .../hadoop/hdds/scm/node/SCMNodeManager.java       | 21 +++++++
 .../hadoop/hdds/scm/pipeline/PipelineManager.java  |  2 +
 .../hdds/scm/pipeline/SCMPipelineManager.java      | 10 ++++
 .../java/org/apache/hadoop/hdds/scm/TestUtils.java | 33 ++++++-----
 .../hadoop/hdds/scm/block/TestBlockManager.java    | 66 ++++++++++++++++++++++
 .../hadoop/hdds/scm/container/MockNodeManager.java | 11 ++++
 .../hadoop/hdds/scm/node/TestSCMNodeManager.java   | 51 ++++++++++++++++-
 .../testutils/ReplicationNodeManagerMock.java      |  5 ++
 12 files changed, 214 insertions(+), 18 deletions(-)

diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml b/hadoop-hdds/common/src/main/resources/ozone-default.xml
index a07807b..b9774aa 100644
--- a/hadoop-hdds/common/src/main/resources/ozone-default.xml
+++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml
@@ -818,7 +818,7 @@
     <name>ozone.scm.pipeline.owner.container.count</name>
     <value>3</value>
     <tag>OZONE, SCM, PIPELINE</tag>
-    <description>Number of containers per owner in a pipeline.
+    <description>Number of containers per owner per disk in a pipeline.
     </description>
   </property>
   <property>
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/SCMContainerManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/SCMContainerManager.java
index 34177f0..e09486e 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/SCMContainerManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/SCMContainerManager.java
@@ -66,7 +66,7 @@ public class SCMContainerManager implements ContainerManager {
 
   private final ContainerStateManager containerStateManager;
 
-  private final int numContainerPerOwnerInPipeline;
+  private final int numContainerPerVolume;
 
   private final SCMContainerManagerMetrics scmContainerManagerMetrics;
 
@@ -98,7 +98,7 @@ public class SCMContainerManager implements ContainerManager {
     this.lock = new ReentrantLock();
     this.pipelineManager = pipelineManager;
     this.containerStateManager = new ContainerStateManager(conf);
-    this.numContainerPerOwnerInPipeline = conf
+    this.numContainerPerVolume = conf
         .getInt(ScmConfigKeys.OZONE_SCM_PIPELINE_OWNER_CONTAINER_COUNT,
             ScmConfigKeys.OZONE_SCM_PIPELINE_OWNER_CONTAINER_COUNT_DEFAULT);
 
@@ -432,7 +432,8 @@ public class SCMContainerManager implements ContainerManager {
       synchronized (pipeline) {
         containerIDs = getContainersForOwner(pipeline, owner);
 
-        if (containerIDs.size() < numContainerPerOwnerInPipeline) {
+        if (containerIDs.size() < numContainerPerVolume * pipelineManager.
+                getNumHealthyVolumes(pipeline)) {
           containerInfo =
                   containerStateManager.allocateContainer(
                           pipelineManager, owner, pipeline);
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DatanodeInfo.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DatanodeInfo.java
index d06ea2a..b39440f 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DatanodeInfo.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DatanodeInfo.java
@@ -109,6 +109,28 @@ public class DatanodeInfo extends DatanodeDetails {
   }
 
   /**
+   * Returns count of healthy volumes reported from datanode.
+   * @return count of healthy volumes
+   */
+  public int getHealthyVolumeCount() {
+    try {
+      lock.readLock().lock();
+      return storageReports.size() - getFailedVolumeCount();
+    } finally {
+      lock.readLock().unlock();
+    }
+  }
+
+  /**
+   * Returns count of failed volumes reported from datanode.
+   * @return count of failed volumes
+   */
+  private int getFailedVolumeCount() {
+    return (int) storageReports.stream().
+            filter(e -> e.hasFailed() ? e.getFailed() : false).count();
+  }
+
+  /**
    * Returns the last updated time of datanode info.
    * @return the last updated time of datanode info.
    */
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java
index 37562fe..df21b84 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java
@@ -213,4 +213,6 @@ public interface NodeManager extends StorageContainerNodeProtocol,
    * @return cluster map
    */
   NetworkTopology getClusterNetworkTopologyMap();
+
+  int getNumHealthyVolumes(List <DatanodeDetails> dnList);
 }
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
index 005881c..1a0cec3 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
@@ -27,6 +27,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
+import java.util.Collections;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ScheduledFuture;
 import java.util.stream.Collectors;
@@ -510,6 +511,26 @@ public class SCMNodeManager implements NodeManager {
   }
 
   /**
+   * Returns the max of no healthy volumes reported out of the set
+   * of datanodes constituting the pipeline.
+   */
+  @Override
+  public int getNumHealthyVolumes(List<DatanodeDetails> dnList) {
+    List<Integer> volumeCountList = new ArrayList<>(dnList.size());
+    for (DatanodeDetails dn : dnList) {
+      try {
+        volumeCountList.add(nodeStateManager.getNode(dn).
+                getHealthyVolumeCount());
+      } catch (NodeNotFoundException e) {
+        LOG.warn("Cannot generate NodeStat, datanode {} not found.",
+                dn.getUuid());
+      }
+    }
+    Preconditions.checkArgument(!volumeCountList.isEmpty());
+    return Collections.max(volumeCountList);
+  }
+
+  /**
    * Get set of pipelines a datanode is part of.
    *
    * @param datanodeDetails - datanodeID
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java
index 48068d8..857f76e 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java
@@ -89,6 +89,8 @@ public interface PipelineManager extends Closeable, PipelineManagerMXBean,
 
   void incNumBlocksAllocatedMetric(PipelineID id);
 
+  int getNumHealthyVolumes(Pipeline pipeline);
+
   /**
    * Activates a dormant pipeline.
    *
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java
index 6fce895..e754059 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java
@@ -661,6 +661,16 @@ public class SCMPipelineManager implements PipelineManager {
     pipelineFactory.shutdown();
   }
 
+  /**
+   * returns max count of healthy volumes from the set of
+   * datanodes constituting the pipeline.
+   * @param  pipeline
+   * @return healthy volume count
+   */
+  public int getNumHealthyVolumes(Pipeline pipeline) {
+    return nodeManager.getNumHealthyVolumes(pipeline.getNodes());
+  }
+
   protected ReadWriteLock getLock() {
     return lock;
   }
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestUtils.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestUtils.java
index 03ed0f7..f4f1759 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestUtils.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestUtils.java
@@ -222,20 +222,26 @@ public final class TestUtils {
         StorageTypeProto.DISK);
   }
 
-  /**
-   * Creates storage report with the given information.
-   *
-   * @param nodeId    datanode id
-   * @param path      storage dir
-   * @param capacity  storage size
-   * @param used      space used
-   * @param remaining space remaining
-   * @param type      type of storage
-   *
-   * @return StorageReportProto
-   */
   public static StorageReportProto createStorageReport(UUID nodeId, String path,
-      long capacity, long used, long remaining, StorageTypeProto type) {
+       long capacity, long used, long remaining, StorageTypeProto type) {
+    return createStorageReport(nodeId, path, capacity, used, remaining,
+            type, false);
+  }
+    /**
+     * Creates storage report with the given information.
+     *
+     * @param nodeId    datanode id
+     * @param path      storage dir
+     * @param capacity  storage size
+     * @param used      space used
+     * @param remaining space remaining
+     * @param type      type of storage
+     *
+     * @return StorageReportProto
+     */
+  public static StorageReportProto createStorageReport(UUID nodeId, String path,
+      long capacity, long used, long remaining, StorageTypeProto type,
+                                                       boolean failed) {
     Preconditions.checkNotNull(nodeId);
     Preconditions.checkNotNull(path);
     StorageReportProto.Builder srb = StorageReportProto.newBuilder();
@@ -243,6 +249,7 @@ public final class TestUtils {
         .setStorageLocation(path)
         .setCapacity(capacity)
         .setScmUsed(used)
+        .setFailed(failed)
         .setRemaining(remaining);
     StorageTypeProto storageTypeProto =
         type == null ? StorageTypeProto.DISK : type;
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestBlockManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestBlockManager.java
index e0ba53c..a72031c 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestBlockManager.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestBlockManager.java
@@ -298,6 +298,72 @@ public class TestBlockManager {
     }
   }
 
+
+  @Test
+  public void testBlockDistributionWithMultipleDisks() throws Exception {
+    int threadCount = numContainerPerOwnerInPipeline *
+            numContainerPerOwnerInPipeline;
+    nodeManager.setNumHealthyVolumes(numContainerPerOwnerInPipeline);
+    List<ExecutorService> executors = new ArrayList<>(threadCount);
+    for (int i = 0; i < threadCount; i++) {
+      executors.add(Executors.newSingleThreadExecutor());
+    }
+    pipelineManager.createPipeline(type, factor);
+    TestUtils.openAllRatisPipelines(pipelineManager);
+    Map<Long, List<AllocatedBlock>> allocatedBlockMap =
+            new ConcurrentHashMap<>();
+    List<CompletableFuture<AllocatedBlock>> futureList =
+            new ArrayList<>(threadCount);
+    for (int i = 0; i < threadCount; i++) {
+      final CompletableFuture<AllocatedBlock> future =
+              new CompletableFuture<>();
+      CompletableFuture.supplyAsync(() -> {
+        try {
+          List<AllocatedBlock> blockList;
+          AllocatedBlock block = blockManager
+                  .allocateBlock(DEFAULT_BLOCK_SIZE, type, factor,
+                          OzoneConsts.OZONE,
+                          new ExcludeList());
+          long containerId = block.getBlockID().getContainerID();
+          if (!allocatedBlockMap.containsKey(containerId)) {
+            blockList = new ArrayList<>();
+          } else {
+            blockList = allocatedBlockMap.get(containerId);
+          }
+          blockList.add(block);
+          allocatedBlockMap.put(containerId, blockList);
+          future.complete(block);
+        } catch (IOException e) {
+          future.completeExceptionally(e);
+        }
+        return future;
+      }, executors.get(i));
+      futureList.add(future);
+    }
+    try {
+      CompletableFuture
+              .allOf(futureList.toArray(
+                      new CompletableFuture[futureList.size()])).get();
+      Assert.assertTrue(
+              pipelineManager.getPipelines(type).size() == 1);
+      Pipeline pipeline = pipelineManager.getPipelines(type).get(0);
+      // total no of containers to be created will be number of healthy
+      // volumes * number of numContainerPerOwnerInPipeline which is equal to
+      // the thread count
+      Assert.assertTrue(threadCount == pipelineManager.
+              getNumberOfContainers(pipeline.getId()));
+      Assert.assertTrue(
+              allocatedBlockMap.size() == threadCount);
+      Assert.assertTrue(allocatedBlockMap.
+              values().size() == threadCount);
+      allocatedBlockMap.values().stream().forEach(v -> {
+        Assert.assertTrue(v.size() == 1);
+      });
+    } catch (Exception e) {
+      Assert.fail("testAllocateBlockInParallel failed");
+    }
+  }
+
   @Test
   public void testAllocateOversizedBlock() throws Exception {
     long size = 6 * GB;
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java
index 54f6ee4..5b635a7 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java
@@ -92,6 +92,7 @@ public class MockNodeManager implements NodeManager {
   private final Node2ContainerMap node2ContainerMap;
   private NetworkTopology clusterMap;
   private ConcurrentMap<String, Set<String>> dnsToUuidMap;
+  private int numHealthyDisksPerDatanode;
 
   public MockNodeManager(NetworkTopologyImpl clusterMap,
                          List<DatanodeDetails> nodes,
@@ -121,6 +122,7 @@ public class MockNodeManager implements NodeManager {
     }
     safemode = false;
     this.commandMap = new HashMap<>();
+    numHealthyDisksPerDatanode = 1;
   }
 
   public MockNodeManager(boolean initializeFakeNodes, int nodeCount) {
@@ -569,6 +571,15 @@ public class MockNodeManager implements NodeManager {
     this.clusterMap = topology;
   }
 
+  @Override
+  public int getNumHealthyVolumes(List<DatanodeDetails> dnList) {
+    return numHealthyDisksPerDatanode;
+  }
+
+  public void setNumHealthyVolumes(int value) {
+    numHealthyDisksPerDatanode = value;
+  }
+
   /**
    * A class to declare some values for the nodes so that our tests
    * won't fail.
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java
index df5cb2d..7a58d46 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java
@@ -848,11 +848,12 @@ public class TestSCMNodeManager {
     final long capacity = 2000;
     final long used = 100;
     final long remaining = capacity - used;
-
+    List<DatanodeDetails> dnList = new ArrayList<>(nodeCount);
     try (SCMNodeManager nodeManager = createNodeManager(conf)) {
       EventQueue eventQueue = (EventQueue) scm.getEventQueue();
       for (int x = 0; x < nodeCount; x++) {
         DatanodeDetails dn = MockDatanodeDetails.randomDatanodeDetails();
+        dnList.add(dn);
         UUID dnId = dn.getUuid();
         long free = capacity - used;
         String storagePath = testDir.getAbsolutePath() + "/" + dnId;
@@ -871,9 +872,57 @@ public class TestSCMNodeManager {
           .getScmUsed().get());
       assertEquals(remaining * nodeCount, (long) nodeManager.getStats()
           .getRemaining().get());
+      assertEquals(1, nodeManager.getNumHealthyVolumes(dnList));
+      dnList.clear();
+    }
+  }
+
+  /**
+   * Test multiple nodes sending initial heartbeat with their node report
+   * with multiple volumes.
+   *
+   * @throws IOException
+   * @throws InterruptedException
+   * @throws TimeoutException
+   */
+  @Test
+  public void tesVolumeInfoFromNodeReport()
+          throws IOException, InterruptedException, AuthenticationException {
+    OzoneConfiguration conf = getConf();
+    conf.setTimeDuration(OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL, 1000,
+            MILLISECONDS);
+    final int volumeCount = 10;
+    final long capacity = 2000;
+    final long used = 100;
+    List<DatanodeDetails> dnList = new ArrayList<>(1);
+    try (SCMNodeManager nodeManager = createNodeManager(conf)) {
+      EventQueue eventQueue = (EventQueue) scm.getEventQueue();
+      DatanodeDetails dn = MockDatanodeDetails.randomDatanodeDetails();
+      dnList.add(dn);
+      UUID dnId = dn.getUuid();
+      long free = capacity - used;
+      List<StorageReportProto> reports = new ArrayList<>(volumeCount);
+      boolean failed = true;
+      for (int x = 0; x < volumeCount; x++) {
+        String storagePath = testDir.getAbsolutePath() + "/" + dnId;
+        reports.add(TestUtils
+                .createStorageReport(dnId, storagePath, capacity,
+                        used, free, null, failed));
+        failed = !failed;
+      }
+      nodeManager.register(dn, TestUtils.createNodeReport(reports), null);
+      nodeManager.processHeartbeat(dn);
+      //TODO: wait for EventQueue to be processed
+      eventQueue.processAll(8000L);
+
+      assertEquals(1, nodeManager.getNodeCount(HEALTHY));
+      assertEquals(volumeCount / 2,
+              nodeManager.getNumHealthyVolumes(dnList));
+      dnList.clear();
     }
   }
 
+
   /**
    * Test single node stat update based on nodereport from different heartbeat
    * status (healthy, stale and dead).
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java
index 9ca3f18..a9b879f 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java
@@ -343,4 +343,9 @@ public class ReplicationNodeManagerMock implements NodeManager {
   public NetworkTopology getClusterNetworkTopologyMap() {
     return null;
   }
+
+  @Override
+  public int getNumHealthyVolumes(List<DatanodeDetails> dnList) {
+    return 0;
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: ozone-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: ozone-commits-help@hadoop.apache.org