You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by sa...@apache.org on 2021/06/16 02:25:15 UTC

[ozone] branch master updated: HDDS-5268. Ensure disk checker also scans the ratis log disks periodically (#2290)

This is an automated email from the ASF dual-hosted git repository.

sammichen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new 9cf7d31  HDDS-5268. Ensure disk checker also scans the ratis log disks periodically (#2290)
9cf7d31 is described below

commit 9cf7d313dfbd613a9faead6dc525fafe8d8b2fa3
Author: Gui Hecheng <ma...@tencent.com>
AuthorDate: Wed Jun 16 10:24:56 2021 +0800

    HDDS-5268. Ensure disk checker also scans the ratis log disks periodically (#2290)
---
 .../apache/hadoop/hdds/DFSConfigKeysLegacy.java    |  12 -
 .../apache/hadoop/ozone/HddsDatanodeService.java   |   7 +-
 .../common/impl/StorageLocationReport.java         |  55 +++-
 .../common/statemachine/DatanodeConfiguration.java | 115 ++++++-
 .../common/statemachine/DatanodeStateMachine.java  |   4 +-
 .../states/endpoint/VersionEndpointTask.java       |  14 +-
 .../transport/server/ratis/XceiverServerRatis.java |  51 +--
 .../container/common/utils/HddsVolumeUtil.java     |  11 -
 .../container/common/utils/StorageVolumeUtil.java  |  51 +++
 .../ozone/container/common/volume/HddsVolume.java  | 175 ++---------
 .../container/common/volume/HddsVolumeFactory.java |  93 ++++++
 .../common/volume/ImmutableVolumeSet.java          |   8 +-
 ...ImmutableVolumeSet.java => MetadataVolume.java} |  57 ++--
 .../common/volume/MetadataVolumeFactory.java       |  56 ++++
 .../container/common/volume/MutableVolumeSet.java  | 344 ++++++++-------------
 .../container/common/volume/StorageVolume.java     | 214 +++++++++++++
 ...olumeChecker.java => StorageVolumeChecker.java} | 138 ++++-----
 .../common/volume/StorageVolumeFactory.java        |  60 ++++
 .../ozone/container/common/volume/VolumeSet.java   |   4 +-
 .../container/keyvalue/KeyValueContainer.java      |   8 +-
 .../ozone/container/keyvalue/KeyValueHandler.java  |   6 +-
 .../container/keyvalue/helpers/BlockUtils.java     |   2 +-
 .../container/keyvalue/helpers/ChunkUtils.java     |   2 +-
 .../keyvalue/impl/FilePerBlockStrategy.java        |   2 +-
 .../ozone/container/ozoneimpl/OzoneContainer.java  |  49 ++-
 .../container/common/TestBlockDeletingService.java |   6 +-
 .../TestSchemaOneBackwardsCompatibility.java       |  15 +-
 .../common/impl/TestContainerPersistence.java      |   6 +-
 .../container/common/impl/TestHddsDispatcher.java  |  11 +-
 .../statemachine/TestDatanodeConfiguration.java    |  55 +++-
 ...eChecker.java => TestStorageVolumeChecker.java} |  40 +--
 .../container/common/volume/TestVolumeSet.java     |  48 +--
 .../common/volume/TestVolumeSetDiskChecks.java     |  70 +++--
 .../keyvalue/TestKeyValueBlockIterator.java        |   6 +-
 .../container/keyvalue/TestKeyValueContainer.java  |   9 +-
 .../keyvalue/TestKeyValueContainerCheck.java       |   6 +-
 .../container/keyvalue/TestKeyValueHandler.java    |   5 +-
 .../container/ozoneimpl/TestContainerReader.java   |  11 +-
 .../container/ozoneimpl/TestOzoneContainer.java    |  16 +-
 .../apache/hadoop/hdds/utils/HddsServerUtil.java   |  16 +
 .../proto/ScmServerDatanodeHeartbeatProtocol.proto |   4 +
 .../ozone/container/common/TestEndPoint.java       |   5 +-
 .../container/metrics/TestContainerMetrics.java    |  11 +-
 .../server/TestSecureContainerServer.java          |   6 +-
 .../apache/hadoop/ozone/dn/DatanodeTestUtils.java  |  19 +-
 .../ozone/dn/TestDatanodeLayoutUpgradeTool.java    |   4 +-
 .../TestDatanodeHddsVolumeFailureDetection.java    |  22 +-
 .../TestDatanodeHddsVolumeFailureToleration.java   |  16 +-
 .../apache/hadoop/ozone/debug/DatanodeLayout.java  |   8 +-
 .../ozone/debug/container/ContainerCommands.java   |  14 +-
 .../hadoop/ozone/freon/ChunkManagerDiskWrite.java  |   4 +-
 .../ozone/freon/ClosedContainerReplicator.java     |   6 +-
 .../containergenerator/GeneratorDatanode.java      |   7 +-
 .../ozone/genesis/BenchMarkDatanodeDispatcher.java |   4 +-
 54 files changed, 1265 insertions(+), 723 deletions(-)

diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/DFSConfigKeysLegacy.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/DFSConfigKeysLegacy.java
index 1e6d73f..6be55ee 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/DFSConfigKeysLegacy.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/DFSConfigKeysLegacy.java
@@ -72,18 +72,6 @@ public final class DFSConfigKeysLegacy {
   public static final String DFS_DATANODE_KEYTAB_FILE_KEY =
       "dfs.datanode.keytab.file";
 
-  public static final String DFS_DATANODE_DISK_CHECK_MIN_GAP_KEY =
-      "dfs.datanode.disk.check.min.gap";
-
-  public static final String DFS_DATANODE_DISK_CHECK_MIN_GAP_DEFAULT =
-      "15m";
-
-  public static final String DFS_DATANODE_DISK_CHECK_TIMEOUT_KEY =
-      "dfs.datanode.disk.check.timeout";
-
-  public static final String DFS_DATANODE_DISK_CHECK_TIMEOUT_DEFAULT =
-      "10m";
-
   public static final String DFS_METRICS_PERCENTILES_INTERVALS_KEY =
       "dfs.metrics.percentiles.intervals";
 
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/HddsDatanodeService.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/HddsDatanodeService.java
index 1710090..9458556 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/HddsDatanodeService.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/HddsDatanodeService.java
@@ -60,6 +60,7 @@ import org.apache.hadoop.ozone.container.common.statemachine.DatanodeStateMachin
 import org.apache.hadoop.ozone.container.common.utils.HddsVolumeUtil;
 import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
 import org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet;
+import org.apache.hadoop.ozone.container.common.volume.StorageVolume;
 import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.authentication.client.AuthenticationException;
@@ -295,10 +296,10 @@ public class HddsDatanodeService extends GenericCli implements ServicePlugin {
     MutableVolumeSet volumeSet =
         getDatanodeStateMachine().getContainer().getVolumeSet();
 
-    Map<String, HddsVolume> volumeMap = volumeSet.getVolumeMap();
+    Map<String, StorageVolume> volumeMap = volumeSet.getVolumeMap();
 
-    for (Map.Entry<String, HddsVolume> entry : volumeMap.entrySet()) {
-      HddsVolume hddsVolume = entry.getValue();
+    for (Map.Entry<String, StorageVolume> entry : volumeMap.entrySet()) {
+      HddsVolume hddsVolume = (HddsVolume) entry.getValue();
       boolean result = HddsVolumeUtil.checkVolume(hddsVolume, scmId,
           clusterId, LOG);
       if (!result) {
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/StorageLocationReport.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/StorageLocationReport.java
index 8599b49..0222050 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/StorageLocationReport.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/StorageLocationReport.java
@@ -20,6 +20,8 @@ package org.apache.hadoop.ozone.container.common.impl;
 
 import org.apache.hadoop.fs.StorageType;
 import org.apache.hadoop.hdds.protocol.proto.
+    StorageContainerDatanodeProtocolProtos.MetadataStorageReportProto;
+import org.apache.hadoop.hdds.protocol.proto.
     StorageContainerDatanodeProtocolProtos.StorageReportProto;
 import org.apache.hadoop.hdds.protocol.proto.
     StorageContainerDatanodeProtocolProtos.StorageTypeProto;
@@ -149,9 +151,9 @@ public final class StorageLocationReport implements
   }
 
   /**
-   * Returns the SCMStorageReport protoBuf message for the Storage Location
+   * Returns the StorageReportProto protoBuf message for the Storage Location
    * report.
-   * @return SCMStorageReport
+   * @return StorageReportProto
    * @throws IOException In case, the storage type specified is invalid.
    */
   public StorageReportProto getProtoBufMessage() throws IOException {
@@ -167,6 +169,25 @@ public final class StorageLocationReport implements
   }
 
   /**
+   * Returns the MetadataStorageReportProto protoBuf message for the
+   * Storage Location report.
+   * @return MetadataStorageReportProto
+   * @throws IOException In case, the storage type specified is invalid.
+   */
+  public MetadataStorageReportProto getMetadataProtoBufMessage()
+      throws IOException {
+    MetadataStorageReportProto.Builder srb =
+        MetadataStorageReportProto.newBuilder();
+    return srb.setCapacity(getCapacity())
+        .setScmUsed(getScmUsed())
+        .setRemaining(getRemaining())
+        .setStorageType(getStorageTypeProto())
+        .setStorageLocation(getStorageLocation())
+        .setFailed(isFailed())
+        .build();
+  }
+
+  /**
    * Returns the StorageLocationReport from the protoBuf message.
    * @param report SCMStorageReport
    * @return StorageLocationReport
@@ -198,6 +219,36 @@ public final class StorageLocationReport implements
   }
 
   /**
+   * Returns the StorageLocationReport from the protoBuf message.
+   * @param report MetadataStorageReportProto
+   * @return StorageLocationReport
+   * @throws IOException in case of invalid storage type
+   */
+
+  public static StorageLocationReport getMetadataFromProtobuf(
+      MetadataStorageReportProto report) throws IOException {
+    StorageLocationReport.Builder builder = StorageLocationReport.newBuilder();
+    builder.setStorageLocation(report.getStorageLocation());
+    if (report.hasCapacity()) {
+      builder.setCapacity(report.getCapacity());
+    }
+    if (report.hasScmUsed()) {
+      builder.setScmUsed(report.getScmUsed());
+    }
+    if (report.hasStorageType()) {
+      builder.setStorageType(getStorageType(report.getStorageType()));
+    }
+    if (report.hasRemaining()) {
+      builder.setRemaining(report.getRemaining());
+    }
+
+    if (report.hasFailed()) {
+      builder.setFailed(report.getFailed());
+    }
+    return builder.build();
+  }
+
+  /**
    * Returns StorageLocation.Builder instance.
    *
    * @return StorageLocation.Builder
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeConfiguration.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeConfiguration.java
index c5885c5..fc90344 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeConfiguration.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeConfiguration.java
@@ -43,8 +43,14 @@ public class DatanodeConfiguration {
       "hdds.datanode.container.delete.threads.max";
   static final String PERIODIC_DISK_CHECK_INTERVAL_MINUTES_KEY =
       "hdds.datanode.periodic.disk.check.interval.minutes";
-  public static final String FAILED_VOLUMES_TOLERATED_KEY =
-      "hdds.datanode.failed.volumes.tolerated";
+  public static final String FAILED_DATA_VOLUMES_TOLERATED_KEY =
+      "hdds.datanode.failed.data.volumes.tolerated";
+  public static final String FAILED_METADATA_VOLUMES_TOLERATED_KEY =
+      "hdds.datanode.failed.metadata.volumes.tolerated";
+  public static final String DISK_CHECK_MIN_GAP_KEY =
+      "hdds.datanode.disk.check.min.gap";
+  public static final String DISK_CHECK_TIMEOUT_KEY =
+      "hdds.datanode.disk.check.timeout";
 
   static final int REPLICATION_MAX_STREAMS_DEFAULT = 10;
 
@@ -52,6 +58,12 @@ public class DatanodeConfiguration {
 
   static final int FAILED_VOLUMES_TOLERATED_DEFAULT = -1;
 
+  static final long DISK_CHECK_MIN_GAP_DEFAULT =
+      Duration.ofMinutes(15).toMillis();
+
+  static final long DISK_CHECK_TIMEOUT_DEFAULT =
+      Duration.ofMinutes(10).toMillis();
+
   /**
    * The maximum number of replication commands a single datanode can execute
    * simultaneously.
@@ -127,16 +139,48 @@ public class DatanodeConfiguration {
   private long periodicDiskCheckIntervalMinutes =
       PERIODIC_DISK_CHECK_INTERVAL_MINUTES_DEFAULT;
 
-  @Config(key = "failed.volumes.tolerated",
+  @Config(key = "failed.data.volumes.tolerated",
+      defaultValue = "-1",
+      type = ConfigType.INT,
+      tags = { DATANODE },
+      description = "The number of data volumes that are allowed to fail "
+          + "before a datanode stops offering service. "
+          + "Config this to -1 means unlimited, but we should have "
+          + "at least one good volume left."
+  )
+  private int failedDataVolumesTolerated = FAILED_VOLUMES_TOLERATED_DEFAULT;
+
+  @Config(key = "failed.metadata.volumes.tolerated",
       defaultValue = "-1",
       type = ConfigType.INT,
       tags = { DATANODE },
-      description = "The number of volumes that are allowed to fail "
+      description = "The number of metadata volumes that are allowed to fail "
           + "before a datanode stops offering service. "
           + "Config this to -1 means unlimited, but we should have "
           + "at least one good volume left."
   )
-  private int failedVolumesTolerated = FAILED_VOLUMES_TOLERATED_DEFAULT;
+  private int failedMetadataVolumesTolerated = FAILED_VOLUMES_TOLERATED_DEFAULT;
+
+  @Config(key = "disk.check.min.gap",
+      defaultValue = "15m",
+      type = ConfigType.TIME,
+      tags = { DATANODE },
+      description = "The minimum gap between two successive checks of the same"
+          + " Datanode volume. Unit could be defined with"
+          + " postfix (ns,ms,s,m,h,d)."
+  )
+  private long diskCheckMinGap = DISK_CHECK_MIN_GAP_DEFAULT;
+
+  @Config(key = "disk.check.timeout",
+      defaultValue = "10m",
+      type = ConfigType.TIME,
+      tags = { DATANODE },
+      description = "Maximum allowed time for a disk check to complete."
+          + " If the check does not complete within this time interval"
+          + " then the disk is declared as failed. Unit could be defined with"
+          + " postfix (ns,ms,s,m,h,d)."
+  )
+  private long diskCheckTimeout = DISK_CHECK_TIMEOUT_DEFAULT;
 
   @PostConstruct
   public void validate() {
@@ -163,11 +207,32 @@ public class DatanodeConfiguration {
           PERIODIC_DISK_CHECK_INTERVAL_MINUTES_DEFAULT;
     }
 
-    if (failedVolumesTolerated < -1) {
-      LOG.warn(FAILED_VOLUMES_TOLERATED_KEY +
+    if (failedDataVolumesTolerated < -1) {
+      LOG.warn(FAILED_DATA_VOLUMES_TOLERATED_KEY +
           "must be greater than -1 and was set to {}. Defaulting to {}",
-          failedVolumesTolerated, FAILED_VOLUMES_TOLERATED_DEFAULT);
-      failedVolumesTolerated = FAILED_VOLUMES_TOLERATED_DEFAULT;
+          failedDataVolumesTolerated, FAILED_VOLUMES_TOLERATED_DEFAULT);
+      failedDataVolumesTolerated = FAILED_VOLUMES_TOLERATED_DEFAULT;
+    }
+
+    if (failedMetadataVolumesTolerated < -1) {
+      LOG.warn(FAILED_METADATA_VOLUMES_TOLERATED_KEY +
+              "must be greater than -1 and was set to {}. Defaulting to {}",
+          failedMetadataVolumesTolerated, FAILED_VOLUMES_TOLERATED_DEFAULT);
+      failedMetadataVolumesTolerated = FAILED_VOLUMES_TOLERATED_DEFAULT;
+    }
+
+    if (diskCheckMinGap < 0) {
+      LOG.warn(DISK_CHECK_MIN_GAP_KEY +
+              " must be greater than zero and was set to {}. Defaulting to {}",
+          diskCheckMinGap, DISK_CHECK_MIN_GAP_DEFAULT);
+      diskCheckMinGap = DISK_CHECK_MIN_GAP_DEFAULT;
+    }
+
+    if (diskCheckTimeout < 0) {
+      LOG.warn(DISK_CHECK_TIMEOUT_KEY +
+              " must be greater than zero and was set to {}. Defaulting to {}",
+          diskCheckTimeout, DISK_CHECK_TIMEOUT_DEFAULT);
+      diskCheckTimeout = DISK_CHECK_TIMEOUT_DEFAULT;
     }
   }
 
@@ -196,11 +261,35 @@ public class DatanodeConfiguration {
     this.periodicDiskCheckIntervalMinutes = periodicDiskCheckIntervalMinutes;
   }
 
-  public int getFailedVolumesTolerated() {
-    return failedVolumesTolerated;
+  public int getFailedDataVolumesTolerated() {
+    return failedDataVolumesTolerated;
+  }
+
+  public void setFailedDataVolumesTolerated(int failedVolumesTolerated) {
+    this.failedDataVolumesTolerated = failedVolumesTolerated;
+  }
+
+  public int getFailedMetadataVolumesTolerated() {
+    return failedMetadataVolumesTolerated;
+  }
+
+  public void setFailedMetadataVolumesTolerated(int failedVolumesTolerated) {
+    this.failedMetadataVolumesTolerated = failedVolumesTolerated;
+  }
+
+  public Duration getDiskCheckMinGap() {
+    return Duration.ofMillis(diskCheckMinGap);
+  }
+
+  public void setDiskCheckMinGap(Duration duration) {
+    this.diskCheckMinGap = duration.toMillis();
+  }
+
+  public Duration getDiskCheckTimeout() {
+    return Duration.ofMillis(diskCheckTimeout);
   }
 
-  public void setFailedVolumesTolerated(int failedVolumesTolerated) {
-    this.failedVolumesTolerated = failedVolumesTolerated;
+  public void setDiskCheckTimeout(Duration duration) {
+    this.diskCheckTimeout = duration.toMillis();
   }
 }
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
index 544e4c5..51261d3 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
@@ -275,7 +275,9 @@ public class DatanodeStateMachine implements Closeable {
 
   public void handleFatalVolumeFailures() {
     LOG.error("DatanodeStateMachine Shutdown due to too many bad volumes, "
-        + "check " + DatanodeConfiguration.FAILED_VOLUMES_TOLERATED_KEY);
+        + "check " + DatanodeConfiguration.FAILED_DATA_VOLUMES_TOLERATED_KEY
+        + " and "
+        + DatanodeConfiguration.FAILED_METADATA_VOLUMES_TOLERATED_KEY);
     hddsDatanodeStopService.stopService();
   }
 
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/VersionEndpointTask.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/VersionEndpointTask.java
index 4762c78..d067000 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/VersionEndpointTask.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/VersionEndpointTask.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.ozone.container.common.statemachine.EndpointStateMachin
 import org.apache.hadoop.ozone.container.common.utils.HddsVolumeUtil;
 import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
 import org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet;
+import org.apache.hadoop.ozone.container.common.volume.StorageVolume;
 import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
 import org.apache.hadoop.ozone.protocol.VersionResponse;
 import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
@@ -81,7 +82,7 @@ public class VersionEndpointTask implements
           MutableVolumeSet volumeSet = ozoneContainer.getVolumeSet();
           volumeSet.writeLock();
           try {
-            Map<String, HddsVolume> volumeMap = volumeSet.getVolumeMap();
+            Map<String, StorageVolume> volumeMap = volumeSet.getVolumeMap();
 
             Preconditions.checkNotNull(scmId,
                 "Reply from SCM: scmId cannot be null");
@@ -91,12 +92,13 @@ public class VersionEndpointTask implements
             // If version file does not exist
             // create version file and also set scmId
 
-            for (Map.Entry<String, HddsVolume> entry : volumeMap.entrySet()) {
-              HddsVolume hddsVolume = entry.getValue();
-              boolean result = HddsVolumeUtil.checkVolume(hddsVolume, scmId,
-                  clusterId, LOG);
+            for (Map.Entry<String, StorageVolume> entry
+                : volumeMap.entrySet()) {
+              StorageVolume volume = entry.getValue();
+              boolean result = HddsVolumeUtil.checkVolume((HddsVolume) volume,
+                  scmId, clusterId, LOG);
               if (!result) {
-                volumeSet.failVolume(hddsVolume.getHddsRootDir().getPath());
+                volumeSet.failVolume(volume.getStorageDir().getPath());
               }
             }
             if (volumeSet.getVolumesList().size() == 0) {
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
index 3a2aec9..867127e 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
@@ -32,7 +32,6 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
 import java.util.UUID;
-import java.util.EnumMap;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
@@ -50,7 +49,6 @@ import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails.Port;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProto;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
-import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.MetadataStorageReportProto;
 import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ClosePipelineInfo;
 import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineAction;
 import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineReport;
@@ -65,14 +63,10 @@ import org.apache.hadoop.hdds.utils.HddsServerUtil;
 import org.apache.hadoop.ozone.OzoneConfigKeys;
 import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.ozone.container.common.impl.ContainerData;
-import org.apache.hadoop.ozone.container.common.impl.StorageLocationReport;
 import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
 import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
 import org.apache.hadoop.ozone.container.common.transport.server.XceiverServerSpi;
-import org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet;
 import org.apache.hadoop.ozone.container.ozoneimpl.ContainerController;
-import org.apache.hadoop.fs.StorageType;
-import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.ImmutableList;
@@ -152,11 +146,6 @@ public final class XceiverServerRatis implements XceiverServerSpi {
   private long requestTimeout;
   private boolean shouldDeleteRatisLogDirectory;
 
-  /**
-   * Maintains a list of active volumes per StorageType.
-   */
-  private EnumMap<StorageType, List<String>> ratisVolumeMap;
-
   private XceiverServerRatis(DatanodeDetails dd,
       ContainerDispatcher dispatcher, ContainerController containerController,
       StateContext context, ConfigurationSource conf, Parameters parameters)
@@ -188,7 +177,6 @@ public final class XceiverServerRatis implements XceiverServerSpi {
         HddsConfigKeys.HDDS_DATANODE_RATIS_SERVER_REQUEST_TIMEOUT,
         HddsConfigKeys.HDDS_DATANODE_RATIS_SERVER_REQUEST_TIMEOUT_DEFAULT,
         TimeUnit.MILLISECONDS);
-    initializeRatisVolumeMap();
   }
 
   private void assignPorts() {
@@ -600,43 +588,6 @@ public final class XceiverServerRatis implements XceiverServerSpi {
     }
   }
 
-  private void  initializeRatisVolumeMap() throws IOException {
-    ratisVolumeMap = new EnumMap<>(StorageType.class);
-    Collection<String> rawLocations = HddsServerUtil.
-            getOzoneDatanodeRatisDirectory(conf);
-
-    for (String locationString : rawLocations) {
-      try {
-        StorageLocation location = StorageLocation.parse(locationString);
-        StorageType type = location.getStorageType();
-        ratisVolumeMap.computeIfAbsent(type, k -> new ArrayList<String>(1));
-        ratisVolumeMap.get(location.getStorageType()).
-                add(location.getUri().getPath());
-
-      } catch (IOException e) {
-        LOG.error("Failed to parse the storage location: " +
-                locationString, e);
-      }
-    }
-  }
-
-  @Override
-  public List<MetadataStorageReportProto> getStorageReport()
-      throws IOException {
-    List<MetadataStorageReportProto> reportProto = new ArrayList<>();
-    for (StorageType storageType : ratisVolumeMap.keySet()) {
-      for (String path : ratisVolumeMap.get(storageType)) {
-        MetadataStorageReportProto.Builder builder = MetadataStorageReportProto.
-                newBuilder();
-        builder.setStorageLocation(path);
-        builder.setStorageType(StorageLocationReport.
-                getStorageTypeProto(storageType));
-        reportProto.add(builder.build());
-      }
-    }
-    return reportProto;
-  }
-
   private RaftClientRequest createRaftClientRequest(
       ContainerCommandRequestProto request, HddsProtos.PipelineID pipelineID,
       RaftClientRequest.Type type) {
@@ -933,7 +884,7 @@ public final class XceiverServerRatis implements XceiverServerSpi {
             .DFS_CONTAINER_RATIS_NUM_WRITE_CHUNK_THREADS_PER_VOLUME_DEFAULT);
 
     final int numberOfDisks =
-        MutableVolumeSet.getDatanodeStorageDirs(conf).size();
+        HddsServerUtil.getDatanodeStorageDirs(conf).size();
 
     ThreadPoolExecutor[] executors =
         new ThreadPoolExecutor[threadCountPerDisk * numberOfDisks];
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/utils/HddsVolumeUtil.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/utils/HddsVolumeUtil.java
index 0372236..9d3810e 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/utils/HddsVolumeUtil.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/utils/HddsVolumeUtil.java
@@ -24,8 +24,6 @@ import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.ozone.common.InconsistentStorageStateException;
 import org.apache.hadoop.ozone.container.common.DataNodeLayoutVersion;
 import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
-import org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet;
-import org.apache.hadoop.ozone.container.common.volume.VolumeSet;
 import org.apache.hadoop.util.ExitUtil;
 import org.apache.hadoop.util.Time;
 import org.slf4j.Logger;
@@ -236,13 +234,4 @@ public final class HddsVolumeUtil {
     }
 
   }
-
-  public static void onFailure(HddsVolume volume) {
-    if (volume != null) {
-      VolumeSet volumeSet = volume.getVolumeSet();
-      if (volumeSet != null && volumeSet instanceof MutableVolumeSet) {
-        ((MutableVolumeSet) volumeSet).checkVolumeAsync(volume);
-      }
-    }
-  }
 }
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/utils/StorageVolumeUtil.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/utils/StorageVolumeUtil.java
new file mode 100644
index 0000000..104dbac
--- /dev/null
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/utils/StorageVolumeUtil.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.container.common.utils;
+
+import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
+import org.apache.hadoop.ozone.container.common.volume.StorageVolume;
+import org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet;
+import org.apache.hadoop.ozone.container.common.volume.VolumeSet;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * A util class for {@link StorageVolume}.
+ */
+public final class StorageVolumeUtil {
+
+  private StorageVolumeUtil() {
+  }
+
+  public static void onFailure(StorageVolume volume) {
+    if (volume != null) {
+      VolumeSet volumeSet = volume.getVolumeSet();
+      if (volumeSet != null && volumeSet instanceof MutableVolumeSet) {
+        ((MutableVolumeSet) volumeSet).checkVolumeAsync(volume);
+      }
+    }
+  }
+
+  public static List<HddsVolume> getHddsVolumesList(
+      List<StorageVolume> volumes) {
+    return volumes.stream().
+        map(v -> (HddsVolume) v).collect(Collectors.toList());
+  }
+}
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/HddsVolume.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/HddsVolume.java
index 868d09c..6da4599 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/HddsVolume.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/HddsVolume.java
@@ -18,27 +18,18 @@
 
 package org.apache.hadoop.ozone.container.common.volume;
 
-import javax.annotation.Nullable;
 import java.io.File;
 import java.io.IOException;
-import java.util.Objects;
 import java.util.Properties;
 import java.util.UUID;
 import java.util.concurrent.atomic.AtomicLong;
 
-import org.apache.hadoop.fs.StorageType;
 import org.apache.hadoop.hdds.annotation.InterfaceAudience;
 import org.apache.hadoop.hdds.annotation.InterfaceStability;
-import org.apache.hadoop.hdds.conf.ConfigurationSource;
-import org.apache.hadoop.hdds.fs.SpaceUsageCheckFactory;
-import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
-import org.apache.hadoop.hdfs.server.datanode.checker.Checkable;
-import org.apache.hadoop.hdfs.server.datanode.checker.VolumeCheckResult;
 import org.apache.hadoop.ozone.common.InconsistentStorageStateException;
 import org.apache.hadoop.ozone.container.common.DataNodeLayoutVersion;
 import org.apache.hadoop.ozone.container.common.helpers.DatanodeVersionFile;
 import org.apache.hadoop.ozone.container.common.utils.HddsVolumeUtil;
-import org.apache.hadoop.util.DiskChecker;
 import org.apache.hadoop.util.Time;
 
 import com.google.common.base.Preconditions;
@@ -67,18 +58,14 @@ import org.slf4j.LoggerFactory;
 @InterfaceAudience.Private
 @InterfaceStability.Unstable
 @SuppressWarnings("finalclass")
-public class HddsVolume
-    implements Checkable<Boolean, VolumeCheckResult> {
+public class HddsVolume extends StorageVolume {
 
   private static final Logger LOG = LoggerFactory.getLogger(HddsVolume.class);
 
   public static final String HDDS_VOLUME_DIR = "hdds";
 
-  private final File hddsRootDir;
-  private final VolumeInfo volumeInfo;
   private VolumeState state;
   private final VolumeIOStats volumeIOStats;
-  private final VolumeSet volumeSet;
 
   // VERSION file properties
   private String storageID;       // id of the file system
@@ -89,46 +76,18 @@ public class HddsVolume
   private final AtomicLong committedBytes; // till Open containers become full
 
   /**
-   * Run a check on the current volume to determine if it is healthy.
-   * @param unused context for the check, ignored.
-   * @return result of checking the volume.
-   * @throws Exception if an exception was encountered while running
-   *            the volume check.
-   */
-  @Override
-  public VolumeCheckResult check(@Nullable Boolean unused) throws Exception {
-    if (!hddsRootDir.exists()) {
-      return VolumeCheckResult.FAILED;
-    }
-    DiskChecker.checkDir(hddsRootDir);
-    return VolumeCheckResult.HEALTHY;
-  }
-
-  /**
    * Builder for HddsVolume.
    */
-  public static class Builder {
-    private final String volumeRootStr;
-    private ConfigurationSource conf;
-    private StorageType storageType;
-
+  public static class Builder extends StorageVolume.Builder<Builder> {
     private String datanodeUuid;
     private String clusterID;
-    private boolean failedVolume = false;
-    private SpaceUsageCheckFactory usageCheckFactory;
-    private VolumeSet volumeSet;
 
-    public Builder(String rootDirStr) {
-      this.volumeRootStr = rootDirStr;
+    public Builder(String volumeRootStr) {
+      super(volumeRootStr, HDDS_VOLUME_DIR);
     }
 
-    public Builder conf(ConfigurationSource config) {
-      this.conf = config;
-      return this;
-    }
-
-    public Builder storageType(StorageType st) {
-      this.storageType = st;
+    @Override
+    public Builder getThis() {
       return this;
     }
 
@@ -142,64 +101,34 @@ public class HddsVolume
       return this;
     }
 
-    // This is added just to create failed volume objects, which will be used
-    // to create failed HddsVolume objects in the case of any exceptions caused
-    // during creating HddsVolume object.
-    public Builder failedVolume(boolean failed) {
-      this.failedVolume = failed;
-      return this;
-    }
-
-    public Builder usageCheckFactory(SpaceUsageCheckFactory factory) {
-      usageCheckFactory = factory;
-      return this;
-    }
-
-    public Builder volumeSet(VolumeSet volSet) {
-      this.volumeSet = volSet;
-      return this;
-    }
-
     public HddsVolume build() throws IOException {
       return new HddsVolume(this);
     }
   }
 
   private HddsVolume(Builder b) throws IOException {
-    if (!b.failedVolume) {
-      StorageLocation location = StorageLocation.parse(b.volumeRootStr);
-      hddsRootDir = new File(location.getUri().getPath(), HDDS_VOLUME_DIR);
+    super(b);
+
+    if (!b.getFailedVolume()) {
       this.state = VolumeState.NOT_INITIALIZED;
       this.clusterID = b.clusterID;
       this.datanodeUuid = b.datanodeUuid;
-      this.volumeIOStats = new VolumeIOStats(b.volumeRootStr);
-
-      volumeInfo = new VolumeInfo.Builder(b.volumeRootStr, b.conf)
-          .storageType(b.storageType)
-          .usageCheckFactory(b.usageCheckFactory)
-          .build();
+      this.volumeIOStats = new VolumeIOStats(b.getVolumeRootStr());
       this.committedBytes = new AtomicLong(0);
-      this.volumeSet = b.volumeSet;
 
-      LOG.info("Creating Volume: {} of storage type : {} and capacity : {}",
-          hddsRootDir, b.storageType, volumeInfo.getCapacity());
+      LOG.info("Creating HddsVolume: {} of storage type : {} capacity : {}",
+          getStorageDir(), b.getStorageType(), getVolumeInfo().getCapacity());
 
       initialize();
     } else {
       // Builder is called with failedVolume set, so create a failed volume
-      // HddsVolumeObject.
-      hddsRootDir = new File(b.volumeRootStr);
+      // HddsVolume Object.
       volumeIOStats = null;
-      volumeInfo = null;
       storageID = UUID.randomUUID().toString();
       state = VolumeState.FAILED;
       committedBytes = null;
-      volumeSet = null;
     }
-  }
 
-  public VolumeInfo getVolumeInfo() {
-    return volumeInfo;
   }
 
   /**
@@ -213,8 +142,8 @@ public class HddsVolume
     switch (intialVolumeState) {
     case NON_EXISTENT:
       // Root directory does not exist. Create it.
-      if (!hddsRootDir.mkdirs()) {
-        throw new IOException("Cannot create directory " + hddsRootDir);
+      if (!getStorageDir().mkdirs()) {
+        throw new IOException("Cannot create directory " + getStorageDir());
       }
       setState(VolumeState.NOT_FORMATTED);
       createVersionFile();
@@ -231,26 +160,26 @@ public class HddsVolume
     case INCONSISTENT:
       // Volume Root is in an inconsistent state. Skip loading this volume.
       throw new IOException("Volume is in an " + VolumeState.INCONSISTENT +
-          " state. Skipped loading volume: " + hddsRootDir.getPath());
+          " state. Skipped loading volume: " + getStorageDir().getPath());
     default:
       throw new IOException("Unrecognized initial state : " +
-          intialVolumeState + "of volume : " + hddsRootDir);
+          intialVolumeState + "of volume : " + getStorageDir());
     }
   }
 
   private VolumeState analyzeVolumeState() {
-    if (!hddsRootDir.exists()) {
+    if (!getStorageDir().exists()) {
       // Volume Root does not exist.
       return VolumeState.NON_EXISTENT;
     }
-    if (!hddsRootDir.isDirectory()) {
+    if (!getStorageDir().isDirectory()) {
       // Volume Root exists but is not a directory.
       LOG.warn("Volume {} exists but is not a directory,"
           + " current volume state: {}.",
-          hddsRootDir.getPath(), VolumeState.INCONSISTENT);
+          getStorageDir().getPath(), VolumeState.INCONSISTENT);
       return VolumeState.INCONSISTENT;
     }
-    File[] files = hddsRootDir.listFiles();
+    File[] files = getStorageDir().listFiles();
     if (files == null || files.length == 0) {
       // Volume Root exists and is empty.
       return VolumeState.NOT_FORMATTED;
@@ -259,7 +188,7 @@ public class HddsVolume
       // Volume Root is non empty but VERSION file does not exist.
       LOG.warn("VERSION file does not exist in volume {},"
           + " current volume state: {}.",
-          hddsRootDir.getPath(), VolumeState.INCONSISTENT);
+          getStorageDir().getPath(), VolumeState.INCONSISTENT);
       return VolumeState.INCONSISTENT;
     }
     // Volume Root and VERSION file exist.
@@ -286,7 +215,7 @@ public class HddsVolume
       // HddsDatanodeService does not have the cluster information yet. Wait
       // for registration with SCM.
       LOG.debug("ClusterID not available. Cannot format the volume {}",
-          this.hddsRootDir.getPath());
+          getStorageDir().getPath());
       setState(VolumeState.NOT_FORMATTED);
     } else {
       // Write the version file to disk.
@@ -342,20 +271,14 @@ public class HddsVolume
   }
 
   private File getVersionFile() {
-    return HddsVolumeUtil.getVersionFile(hddsRootDir);
+    return HddsVolumeUtil.getVersionFile(super.getStorageDir());
   }
 
   public File getHddsRootDir() {
-    return hddsRootDir;
-  }
-
-  public StorageType getStorageType() {
-    if(volumeInfo != null) {
-      return volumeInfo.getStorageType();
-    }
-    return StorageType.DEFAULT;
+    return super.getStorageDir();
   }
 
+  @Override
   public String getStorageID() {
     return storageID;
   }
@@ -380,18 +303,6 @@ public class HddsVolume
     return state;
   }
 
-  public long getCapacity() {
-    return volumeInfo != null ? volumeInfo.getCapacity() : 0;
-  }
-
-  public long getAvailable() {
-    return volumeInfo != null ? volumeInfo.getAvailable() : 0;
-  }
-
-  public long getUsedSpace() {
-    return volumeInfo != null ? volumeInfo.getScmUsed() : 0;
-  }
-
   public void setState(VolumeState state) {
     this.state = state;
   }
@@ -404,25 +315,19 @@ public class HddsVolume
     return volumeIOStats;
   }
 
-  public VolumeSet getVolumeSet() {
-    return volumeSet;
-  }
-
+  @Override
   public void failVolume() {
     setState(VolumeState.FAILED);
-    if (volumeInfo != null) {
-      volumeInfo.shutdownUsageThread();
-    }
+    super.failVolume();
     if (volumeIOStats != null) {
       volumeIOStats.unregister();
     }
   }
 
+  @Override
   public void shutdown() {
     this.state = VolumeState.NON_EXISTENT;
-    if (volumeInfo != null) {
-      volumeInfo.shutdownUsageThread();
-    }
+    super.shutdown();
     if (volumeIOStats != null) {
       volumeIOStats.unregister();
     }
@@ -465,24 +370,4 @@ public class HddsVolume
   public long getCommittedBytes() {
     return committedBytes.get();
   }
-
-  @Override
-  public int hashCode() {
-    return Objects.hash(hddsRootDir);
-  }
-
-  @Override
-  public boolean equals(Object other) {
-    return this == other
-        || other instanceof HddsVolume && ((HddsVolume) other).hddsRootDir
-        .equals(this.hddsRootDir);
-  }
-
-  /**
-   * Override toSting() to show the path of HddsVolume.
-   */
-  @Override
-  public String toString() {
-    return getHddsRootDir().toString();
-  }
 }
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/HddsVolumeFactory.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/HddsVolumeFactory.java
new file mode 100644
index 0000000..3b7b108
--- /dev/null
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/HddsVolumeFactory.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.container.common.volume;
+
+import org.apache.hadoop.fs.StorageType;
+import org.apache.hadoop.hdds.conf.ConfigurationSource;
+import org.apache.hadoop.hdds.fs.SpaceUsageCheckFactory;
+import org.apache.hadoop.ozone.common.InconsistentStorageStateException;
+
+import java.io.IOException;
+
+/**
+ * A factory class for HddsVolume.
+ */
+public class HddsVolumeFactory extends StorageVolumeFactory {
+
+  private String datanodeUuid;
+  private String clusterID;
+
+  public HddsVolumeFactory(ConfigurationSource conf,
+      SpaceUsageCheckFactory usageCheckFactory, MutableVolumeSet volumeSet,
+      String datanodeUuid, String clusterID) {
+    super(conf, usageCheckFactory, volumeSet);
+    this.datanodeUuid = datanodeUuid;
+    this.clusterID = clusterID;
+  }
+
+  @Override
+  public StorageVolume createVolume(String locationString,
+      StorageType storageType) throws IOException {
+    HddsVolume.Builder volumeBuilder = new HddsVolume.Builder(locationString)
+        .conf(getConf())
+        .datanodeUuid(datanodeUuid)
+        .clusterID(clusterID)
+        .usageCheckFactory(getUsageCheckFactory())
+        .storageType(storageType)
+        .volumeSet(getVolumeSet());
+    HddsVolume volume = volumeBuilder.build();
+
+    checkAndSetClusterID(volume.getClusterID());
+
+    return volume;
+  }
+
+  @Override
+  public StorageVolume createFailedVolume(String locationString)
+      throws IOException {
+    HddsVolume.Builder volumeBuilder = new HddsVolume.Builder(locationString)
+        .failedVolume(true);
+    return volumeBuilder.build();
+  }
+
+  /**
+   * If Version file exists and the {@link #clusterID} is not set yet,
+   * assign it the value from Version file. Otherwise, check that the given
+   * id matches with the id from version file.
+   * @param idFromVersionFile value of the property from Version file
+   * @throws InconsistentStorageStateException
+   */
+  private void checkAndSetClusterID(String idFromVersionFile)
+      throws InconsistentStorageStateException {
+    // If the clusterID is null (not set), assign it the value
+    // from version file.
+    if (this.clusterID == null) {
+      this.clusterID = idFromVersionFile;
+      return;
+    }
+
+    // If the clusterID is already set, it should match with the value from the
+    // version file.
+    if (!idFromVersionFile.equals(this.clusterID)) {
+      throw new InconsistentStorageStateException(
+          "Mismatched ClusterIDs. VolumeSet has: " + this.clusterID +
+              ", and version file has: " + idFromVersionFile);
+    }
+  }
+}
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/ImmutableVolumeSet.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/ImmutableVolumeSet.java
index 6b2dd33..e5cd341 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/ImmutableVolumeSet.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/ImmutableVolumeSet.java
@@ -22,18 +22,18 @@ import com.google.common.collect.ImmutableList;
 import java.util.List;
 
 /**
- * Fixed list of HDDS volumes.
+ * Fixed list of volumes.
  */
 public final class ImmutableVolumeSet implements VolumeSet {
 
-  private final List<HddsVolume> volumes;
+  private final List<StorageVolume> volumes;
 
-  public ImmutableVolumeSet(HddsVolume... volumes) {
+  public ImmutableVolumeSet(StorageVolume... volumes) {
     this.volumes = ImmutableList.copyOf(volumes);
   }
 
   @Override
-  public List<HddsVolume> getVolumesList() {
+  public List<StorageVolume> getVolumesList() {
     return volumes;
   }
 
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/ImmutableVolumeSet.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/MetadataVolume.java
similarity index 52%
copy from hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/ImmutableVolumeSet.java
copy to hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/MetadataVolume.java
index 6b2dd33..c5532ff 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/ImmutableVolumeSet.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/MetadataVolume.java
@@ -6,54 +6,47 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.hadoop.ozone.container.common.volume;
 
-import com.google.common.collect.ImmutableList;
+package org.apache.hadoop.ozone.container.common.volume;
 
-import java.util.List;
+import java.io.IOException;
 
 /**
- * Fixed list of HDDS volumes.
+ * MetadataVolume represents a volume in datanode for metadata(ratis).
+ * Datanode itself doesn't consume this volume, but only manages checks
+ * and volume info for it.
  */
-public final class ImmutableVolumeSet implements VolumeSet {
-
-  private final List<HddsVolume> volumes;
-
-  public ImmutableVolumeSet(HddsVolume... volumes) {
-    this.volumes = ImmutableList.copyOf(volumes);
-  }
+public class MetadataVolume extends StorageVolume {
 
-  @Override
-  public List<HddsVolume> getVolumesList() {
-    return volumes;
+  protected MetadataVolume(Builder b) throws IOException {
+    super(b);
   }
 
-  @Override
-  public void readLock() {
-    // no-op, immutable
-  }
+  /**
+   * Builder class for MetadataVolume.
+   */
+  public static class Builder extends StorageVolume.Builder<Builder> {
 
-  @Override
-  public void readUnlock() {
-    // no-op, immutable
-  }
+    public Builder(String volumeRootStr) {
+      super(volumeRootStr, "");
+    }
 
-  @Override
-  public void writeLock() {
-    // no-op, immutable
-  }
+    @Override
+    public Builder getThis() {
+      return this;
+    }
 
-  @Override
-  public void writeUnlock() {
-    // no-op, immutable
+    public MetadataVolume build() throws IOException {
+      return new MetadataVolume(this);
+    }
   }
 }
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/MetadataVolumeFactory.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/MetadataVolumeFactory.java
new file mode 100644
index 0000000..b83cb38
--- /dev/null
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/MetadataVolumeFactory.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.container.common.volume;
+
+import org.apache.hadoop.fs.StorageType;
+import org.apache.hadoop.hdds.conf.ConfigurationSource;
+import org.apache.hadoop.hdds.fs.SpaceUsageCheckFactory;
+
+import java.io.IOException;
+
+/**
+ * A factory class for MetadataVolume.
+ */
+public class MetadataVolumeFactory extends StorageVolumeFactory {
+
+  public MetadataVolumeFactory(ConfigurationSource conf,
+      SpaceUsageCheckFactory usageCheckFactory, MutableVolumeSet volumeSet) {
+    super(conf, usageCheckFactory, volumeSet);
+  }
+
+  @Override
+  StorageVolume createVolume(String locationString, StorageType storageType)
+      throws IOException {
+    MetadataVolume.Builder volumeBuilder =
+        new MetadataVolume.Builder(locationString)
+            .conf(getConf())
+            .usageCheckFactory(getUsageCheckFactory())
+            .storageType(storageType)
+            .volumeSet(getVolumeSet());
+    return volumeBuilder.build();
+  }
+
+  @Override
+  StorageVolume createFailedVolume(String locationString) throws IOException {
+    MetadataVolume.Builder volumeBuilder =
+        new MetadataVolume.Builder(locationString)
+            .failedVolume(true);
+    return volumeBuilder.build();
+  }
+}
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/MutableVolumeSet.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/MutableVolumeSet.java
index 7b0a1c1..76a576b 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/MutableVolumeSet.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/MutableVolumeSet.java
@@ -27,38 +27,30 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.apache.hadoop.fs.StorageType;
 import org.apache.hadoop.hdds.conf.ConfigurationSource;
 import org.apache.hadoop.hdds.fs.SpaceUsageCheckFactory;
+import org.apache.hadoop.hdds.utils.HddsServerUtil;
 import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
-import org.apache.hadoop.ozone.common.InconsistentStorageStateException;
 import org.apache.hadoop.ozone.container.common.impl.StorageLocationReport;
 import org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration;
 import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
-import org.apache.hadoop.ozone.container.common.utils.HddsVolumeUtil;
-import org.apache.hadoop.ozone.container.common.volume.HddsVolume.VolumeState;
-import org.apache.hadoop.util.DiskChecker;
 import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
 import org.apache.hadoop.util.ShutdownHookManager;
-import org.apache.hadoop.util.Timer;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
-import static org.apache.hadoop.hdds.DFSConfigKeysLegacy.DFS_DATANODE_DATA_DIR_KEY;
-import static org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_DATANODE_DIR_KEY;
 import static org.apache.hadoop.util.RunJar.SHUTDOWN_HOOK_PRIORITY;
+
+import org.apache.hadoop.util.Timer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * VolumeSet to manage HDDS volumes in a DataNode.
+ * VolumeSet to manage volumes in a DataNode.
  */
 public class MutableVolumeSet implements VolumeSet {
 
@@ -71,24 +63,17 @@ public class MutableVolumeSet implements VolumeSet {
    * Maintains a map of all active volumes in the DataNode.
    * Each volume has one-to-one mapping with a volumeInfo object.
    */
-  private Map<String, HddsVolume> volumeMap;
+  private Map<String, StorageVolume> volumeMap;
   /**
    * Maintains a map of volumes which have failed. The keys in this map and
    * {@link #volumeMap} are mutually exclusive.
    */
-  private Map<String, HddsVolume> failedVolumeMap;
+  private Map<String, StorageVolume> failedVolumeMap;
 
   /**
    * Maintains a list of active volumes per StorageType.
    */
-  private EnumMap<StorageType, List<HddsVolume>> volumeStateMap;
-
-  /**
-   * An executor for periodic disk checks.
-   */
-  private final ScheduledExecutorService diskCheckerservice;
-  private final ScheduledFuture<?> periodicDiskChecker;
-  private final SpaceUsageCheckFactory usageCheckFactory;
+  private EnumMap<StorageType, List<StorageVolume>> volumeStateMap;
 
   /**
    * A Reentrant Read Write Lock to synchronize volume operations in VolumeSet.
@@ -101,46 +86,47 @@ public class MutableVolumeSet implements VolumeSet {
   private String clusterID;
 
   private Runnable shutdownHook;
-  private final HddsVolumeChecker volumeChecker;
+  private final StorageVolumeChecker volumeChecker;
   private Runnable failedVolumeListener;
   private StateContext context;
+  private final StorageVolumeFactory volumeFactory;
+  private final StorageVolume.VolumeType volumeType;
+  private int maxVolumeFailuresTolerated;
 
   public MutableVolumeSet(String dnUuid, ConfigurationSource conf,
-      StateContext context) throws IOException {
-    this(dnUuid, null, conf, context);
+      StateContext context, StorageVolume.VolumeType volumeType,
+      StorageVolumeChecker volumeChecker) throws IOException {
+    this(dnUuid, null, conf, context, volumeType, volumeChecker);
   }
 
   public MutableVolumeSet(String dnUuid, String clusterID,
-      ConfigurationSource conf, StateContext context)
-      throws IOException {
+      ConfigurationSource conf, StateContext context,
+      StorageVolume.VolumeType volumeType, StorageVolumeChecker volumeChecker
+  ) throws IOException {
     this.context = context;
     this.datanodeUuid = dnUuid;
     this.clusterID = clusterID;
     this.conf = conf;
     this.volumeSetRWLock = new ReentrantReadWriteLock();
-    this.volumeChecker = getVolumeChecker(conf);
-    this.diskCheckerservice = Executors.newScheduledThreadPool(
-        1, r -> {
-            Thread t = new Thread(r, "Periodic HDDS volume checker");
-            t.setDaemon(true);
-            return t;
-        });
+    this.volumeChecker = volumeChecker;
+    if (this.volumeChecker != null) {
+      this.volumeChecker.registerVolumeSet(this);
+    }
+    this.volumeType = volumeType;
 
+    SpaceUsageCheckFactory usageCheckFactory =
+        SpaceUsageCheckFactory.create(conf);
     DatanodeConfiguration dnConf =
         conf.getObject(DatanodeConfiguration.class);
-    long periodicDiskCheckIntervalMinutes =
-        dnConf.getPeriodicDiskCheckIntervalMinutes();
-    this.periodicDiskChecker =
-      diskCheckerservice.scheduleWithFixedDelay(() -> {
-        try {
-          checkAllVolumes();
-        } catch (IOException e) {
-          LOG.warn("Exception while checking disks", e);
-        }
-      }, periodicDiskCheckIntervalMinutes, periodicDiskCheckIntervalMinutes,
-        TimeUnit.MINUTES);
-
-    usageCheckFactory = SpaceUsageCheckFactory.create(conf);
+    if (volumeType == StorageVolume.VolumeType.META_VOLUME) {
+      this.volumeFactory = new MetadataVolumeFactory(conf, usageCheckFactory,
+          this);
+      maxVolumeFailuresTolerated = dnConf.getFailedMetadataVolumesTolerated();
+    } else {
+      this.volumeFactory = new HddsVolumeFactory(conf, usageCheckFactory,
+          this, datanodeUuid, clusterID);
+      maxVolumeFailuresTolerated = dnConf.getFailedDataVolumesTolerated();
+    }
 
     initializeVolumeSet();
   }
@@ -150,10 +136,10 @@ public class MutableVolumeSet implements VolumeSet {
     this.clusterID = null;
     this.conf = conf;
     this.volumeSetRWLock = new ReentrantReadWriteLock();
-    this.volumeChecker = getVolumeChecker(conf);
-    this.diskCheckerservice = null;
-    this.periodicDiskChecker = null;
-    this.usageCheckFactory = null;
+    this.volumeChecker = new StorageVolumeChecker(conf, new Timer());
+    this.volumeType = StorageVolume.VolumeType.DATA_VOLUME;
+    this.volumeFactory = new HddsVolumeFactory(conf, null,
+        this, null, null);
     initializeVolumeSet();
   }
 
@@ -162,13 +148,7 @@ public class MutableVolumeSet implements VolumeSet {
   }
 
   @VisibleForTesting
-  HddsVolumeChecker getVolumeChecker(ConfigurationSource configuration)
-      throws DiskChecker.DiskErrorException {
-    return new HddsVolumeChecker(configuration, new Timer());
-  }
-
-  @VisibleForTesting
-  public HddsVolumeChecker getVolumeChecker() {
+  public StorageVolumeChecker getVolumeChecker() {
     return volumeChecker;
   }
 
@@ -180,7 +160,12 @@ public class MutableVolumeSet implements VolumeSet {
     failedVolumeMap = new ConcurrentHashMap<>();
     volumeStateMap = new EnumMap<>(StorageType.class);
 
-    Collection<String> rawLocations = getDatanodeStorageDirs(conf);
+    Collection<String> rawLocations;
+    if (volumeType == StorageVolume.VolumeType.META_VOLUME) {
+      rawLocations = HddsServerUtil.getOzoneDatanodeRatisDirectory(conf);
+    } else {
+      rawLocations = HddsServerUtil.getDatanodeStorageDirs(conf);
+    }
 
     for (StorageType storageType : StorageType.values()) {
       volumeStateMap.put(storageType, new ArrayList<>());
@@ -190,24 +175,22 @@ public class MutableVolumeSet implements VolumeSet {
       try {
         StorageLocation location = StorageLocation.parse(locationString);
 
-        HddsVolume hddsVolume = createVolume(location.getUri().getPath(),
-            location.getStorageType());
-
-        checkAndSetClusterID(hddsVolume.getClusterID());
+        StorageVolume volume = volumeFactory.createVolume(
+            location.getUri().getPath(), location.getStorageType());
 
         LOG.info("Added Volume : {} to VolumeSet",
-            hddsVolume.getHddsRootDir().getPath());
+            volume.getStorageDir().getPath());
 
-        if (!hddsVolume.getHddsRootDir().mkdirs() &&
-            !hddsVolume.getHddsRootDir().exists()) {
-          throw new IOException("Failed to create HDDS storage dir " +
-              hddsVolume.getHddsRootDir());
+        if (!volume.getStorageDir().mkdirs() &&
+            !volume.getStorageDir().exists()) {
+          throw new IOException("Failed to create storage dir " +
+              volume.getStorageDir());
         }
-        volumeMap.put(hddsVolume.getHddsRootDir().getPath(), hddsVolume);
-        volumeStateMap.get(hddsVolume.getStorageType()).add(hddsVolume);
+        volumeMap.put(volume.getStorageDir().getPath(), volume);
+        volumeStateMap.get(volume.getStorageType()).add(volume);
       } catch (IOException e) {
-        HddsVolume volume = new HddsVolume.Builder(locationString)
-            .failedVolume(true).build();
+        StorageVolume volume =
+            volumeFactory.createFailedVolume(locationString);
         failedVolumeMap.put(locationString, volume);
         LOG.error("Failed to parse the storage location: " + locationString, e);
       }
@@ -229,27 +212,18 @@ public class MutableVolumeSet implements VolumeSet {
         SHUTDOWN_HOOK_PRIORITY);
   }
 
-  public static Collection<String> getDatanodeStorageDirs(
-      ConfigurationSource conf) {
-    Collection<String> rawLocations = conf.getTrimmedStringCollection(
-        HDDS_DATANODE_DIR_KEY);
-    if (rawLocations.isEmpty()) {
-      rawLocations = conf.getTrimmedStringCollection(DFS_DATANODE_DATA_DIR_KEY);
-    }
-    if (rawLocations.isEmpty()) {
-      throw new IllegalArgumentException("No location configured in either "
-          + HDDS_DATANODE_DIR_KEY + " or " + DFS_DATANODE_DATA_DIR_KEY);
-    }
-    return rawLocations;
-  }
-
   /**
-   * Run a synchronous parallel check of all HDDS volumes, removing
+   * Run a synchronous parallel check of all volumes, removing
    * failed volumes.
    */
-  void checkAllVolumes() throws IOException {
-    List<HddsVolume> allVolumes = getVolumesList();
-    Set<HddsVolume> failedVolumes;
+  public void checkAllVolumes() throws IOException {
+    if (volumeChecker == null) {
+      LOG.debug("No volumeChecker, skip checkAllVolumes");
+      return;
+    }
+
+    List<StorageVolume> allVolumes = getVolumesList();
+    Set<? extends StorageVolume> failedVolumes;
     try {
       failedVolumes = volumeChecker.checkAllVolumes(allVolumes);
     } catch (InterruptedException e) {
@@ -270,25 +244,23 @@ public class MutableVolumeSet implements VolumeSet {
    * Handle one or more failed volumes.
    * @param failedVolumes
    */
-  private void handleVolumeFailures(Set<HddsVolume> failedVolumes)
-      throws IOException {
+  private void handleVolumeFailures(
+      Set<? extends StorageVolume> failedVolumes) throws IOException {
     this.writeLock();
     try {
-      for (HddsVolume v : failedVolumes) {
+      for (StorageVolume v : failedVolumes) {
         // Immediately mark the volume as failed so it is unavailable
         // for new containers.
-        failVolume(v.getHddsRootDir().getPath());
+        failVolume(v.getStorageDir().getPath());
       }
 
       // check failed volume tolerated
       if (!hasEnoughVolumes()) {
         // on startup, we could not try to stop uninitialized services
         if (shutdownHook == null) {
-          DatanodeConfiguration dnConf =
-              conf.getObject(DatanodeConfiguration.class);
           throw new IOException("Don't have enough good volumes on startup,"
               + " bad volumes detected: " + failedVolumes.size()
-              + " max tolerated: " + dnConf.getFailedVolumesTolerated());
+              + " max tolerated: " + maxVolumeFailuresTolerated);
         }
         if (context != null) {
           context.getParent().handleFatalVolumeFailures();
@@ -304,10 +276,14 @@ public class MutableVolumeSet implements VolumeSet {
     // TODO:
     // 1. Consider stopping IO on open containers and tearing down
     //    active pipelines.
-    // 2. Handle Ratis log disk failure.
   }
 
-  public void checkVolumeAsync(HddsVolume volume) {
+  public void checkVolumeAsync(StorageVolume volume) {
+    if (volumeChecker == null) {
+      LOG.debug("No volumeChecker, skip checkVolumeAsync");
+      return;
+    }
+
     volumeChecker.checkVolume(
         volume, (healthyVolumes, failedVolumes) -> {
           if (failedVolumes.size() > 0) {
@@ -321,31 +297,6 @@ public class MutableVolumeSet implements VolumeSet {
   }
 
   /**
-   * If Version file exists and the {@link #clusterID} is not set yet,
-   * assign it the value from Version file. Otherwise, check that the given
-   * id matches with the id from version file.
-   * @param idFromVersionFile value of the property from Version file
-   * @throws InconsistentStorageStateException
-   */
-  private void checkAndSetClusterID(String idFromVersionFile)
-      throws InconsistentStorageStateException {
-    // If the clusterID is null (not set), assign it the value
-    // from version file.
-    if (this.clusterID == null) {
-      this.clusterID = idFromVersionFile;
-      return;
-    }
-
-    // If the clusterID is already set, it should match with the value from the
-    // version file.
-    if (!idFromVersionFile.equals(this.clusterID)) {
-      throw new InconsistentStorageStateException(
-          "Mismatched ClusterIDs. VolumeSet has: " + this.clusterID +
-              ", and version file has: " + idFromVersionFile);
-    }
-  }
-
-  /**
    * Acquire Volume Set Read lock.
    */
   @Override
@@ -377,20 +328,6 @@ public class MutableVolumeSet implements VolumeSet {
     volumeSetRWLock.writeLock().unlock();
   }
 
-
-  private HddsVolume createVolume(String locationString,
-      StorageType storageType) throws IOException {
-    HddsVolume.Builder volumeBuilder = new HddsVolume.Builder(locationString)
-        .conf(conf)
-        .datanodeUuid(datanodeUuid)
-        .clusterID(clusterID)
-        .usageCheckFactory(usageCheckFactory)
-        .storageType(storageType)
-        .volumeSet(this);
-    return volumeBuilder.build();
-  }
-
-
   // Add a volume to VolumeSet
   boolean addVolume(String dataDir) {
     return addVolume(dataDir, StorageType.DEFAULT);
@@ -398,25 +335,25 @@ public class MutableVolumeSet implements VolumeSet {
 
   // Add a volume to VolumeSet
   private boolean addVolume(String volumeRoot, StorageType storageType) {
-    String hddsRoot = HddsVolumeUtil.getHddsRoot(volumeRoot);
     boolean success;
 
     this.writeLock();
     try {
-      if (volumeMap.containsKey(hddsRoot)) {
-        LOG.warn("Volume : {} already exists in VolumeMap", hddsRoot);
+      if (volumeMap.containsKey(volumeRoot)) {
+        LOG.warn("Volume : {} already exists in VolumeMap", volumeRoot);
         success = false;
       } else {
-        if (failedVolumeMap.containsKey(hddsRoot)) {
-          failedVolumeMap.remove(hddsRoot);
+        if (failedVolumeMap.containsKey(volumeRoot)) {
+          failedVolumeMap.remove(volumeRoot);
         }
 
-        HddsVolume hddsVolume = createVolume(volumeRoot, storageType);
-        volumeMap.put(hddsVolume.getHddsRootDir().getPath(), hddsVolume);
-        volumeStateMap.get(hddsVolume.getStorageType()).add(hddsVolume);
+        StorageVolume volume =
+            volumeFactory.createVolume(volumeRoot, storageType);
+        volumeMap.put(volume.getStorageDir().getPath(), volume);
+        volumeStateMap.get(volume.getStorageType()).add(volume);
 
         LOG.info("Added Volume : {} to VolumeSet",
-            hddsVolume.getHddsRootDir().getPath());
+            volume.getStorageDir().getPath());
         success = true;
       }
     } catch (IOException ex) {
@@ -429,24 +366,22 @@ public class MutableVolumeSet implements VolumeSet {
   }
 
   // Mark a volume as failed
-  public void failVolume(String dataDir) {
-    String hddsRoot = HddsVolumeUtil.getHddsRoot(dataDir);
-
+  public void failVolume(String volumeRoot) {
     this.writeLock();
     try {
-      if (volumeMap.containsKey(hddsRoot)) {
-        HddsVolume hddsVolume = volumeMap.get(hddsRoot);
-        hddsVolume.failVolume();
+      if (volumeMap.containsKey(volumeRoot)) {
+        StorageVolume volume = volumeMap.get(volumeRoot);
+        volume.failVolume();
 
-        volumeMap.remove(hddsRoot);
-        volumeStateMap.get(hddsVolume.getStorageType()).remove(hddsVolume);
-        failedVolumeMap.put(hddsRoot, hddsVolume);
+        volumeMap.remove(volumeRoot);
+        volumeStateMap.get(volume.getStorageType()).remove(volume);
+        failedVolumeMap.put(volumeRoot, volume);
 
-        LOG.info("Moving Volume : {} to failed Volumes", hddsRoot);
-      } else if (failedVolumeMap.containsKey(hddsRoot)) {
-        LOG.info("Volume : {} is not active", hddsRoot);
+        LOG.info("Moving Volume : {} to failed Volumes", volumeRoot);
+      } else if (failedVolumeMap.containsKey(volumeRoot)) {
+        LOG.info("Volume : {} is not active", volumeRoot);
       } else {
-        LOG.warn("Volume : {} does not exist in VolumeSet", hddsRoot);
+        LOG.warn("Volume : {} does not exist in VolumeSet", volumeRoot);
       }
     } finally {
       this.writeUnlock();
@@ -454,27 +389,22 @@ public class MutableVolumeSet implements VolumeSet {
   }
 
   // Remove a volume from the VolumeSet completely.
-  public void removeVolume(String dataDir) throws IOException {
-    String hddsRoot = HddsVolumeUtil.getHddsRoot(dataDir);
-
+  public void removeVolume(String volumeRoot) throws IOException {
     this.writeLock();
     try {
-      if (volumeMap.containsKey(hddsRoot)) {
-        HddsVolume hddsVolume = volumeMap.get(hddsRoot);
-        hddsVolume.shutdown();
-
-        volumeMap.remove(hddsRoot);
-        volumeStateMap.get(hddsVolume.getStorageType()).remove(hddsVolume);
+      if (volumeMap.containsKey(volumeRoot)) {
+        StorageVolume volume = volumeMap.get(volumeRoot);
+        volume.shutdown();
 
-        LOG.info("Removed Volume : {} from VolumeSet", hddsRoot);
-      } else if (failedVolumeMap.containsKey(hddsRoot)) {
-        HddsVolume hddsVolume = failedVolumeMap.get(hddsRoot);
-        hddsVolume.setState(VolumeState.NON_EXISTENT);
+        volumeMap.remove(volumeRoot);
+        volumeStateMap.get(volume.getStorageType()).remove(volume);
 
-        failedVolumeMap.remove(hddsRoot);
-        LOG.info("Removed Volume : {} from failed VolumeSet", hddsRoot);
+        LOG.info("Removed Volume : {} from VolumeSet", volumeRoot);
+      } else if (failedVolumeMap.containsKey(volumeRoot)) {
+        failedVolumeMap.remove(volumeRoot);
+        LOG.info("Removed Volume : {} from failed VolumeSet", volumeRoot);
       } else {
-        LOG.warn("Volume : {} does not exist in VolumeSet", hddsRoot);
+        LOG.warn("Volume : {} does not exist in VolumeSet", volumeRoot);
       }
     } finally {
       this.writeUnlock();
@@ -486,11 +416,11 @@ public class MutableVolumeSet implements VolumeSet {
    * thread and write scmUsed on each volume.
    */
   private void saveVolumeSetUsed() {
-    for (HddsVolume hddsVolume : volumeMap.values()) {
+    for (StorageVolume volume : volumeMap.values()) {
       try {
-        hddsVolume.shutdown();
+        volume.shutdown();
       } catch (Exception ex) {
-        LOG.error("Failed to shutdown volume : " + hddsVolume.getHddsRootDir(),
+        LOG.error("Failed to shutdown volume : " + volume.getStorageDir(),
             ex);
       }
     }
@@ -501,71 +431,54 @@ public class MutableVolumeSet implements VolumeSet {
    */
   public void shutdown() {
     saveVolumeSetUsed();
-    stopDiskChecker();
     if (shutdownHook != null) {
       ShutdownHookManager.get().removeShutdownHook(shutdownHook);
     }
   }
 
-  private void stopDiskChecker() {
-    if (periodicDiskChecker != null) {
-      periodicDiskChecker.cancel(true);
-    }
-    if (volumeChecker != null) {
-      volumeChecker.shutdownAndWait(0, TimeUnit.SECONDS);
-    }
-
-    if (diskCheckerservice != null) {
-      diskCheckerservice.shutdownNow();
-    }
-  }
-
   @Override
   @VisibleForTesting
-  public List<HddsVolume> getVolumesList() {
+  public List<StorageVolume> getVolumesList() {
     return ImmutableList.copyOf(volumeMap.values());
   }
 
   @VisibleForTesting
-  public List<HddsVolume> getFailedVolumesList() {
+  public List<StorageVolume> getFailedVolumesList() {
     return ImmutableList.copyOf(failedVolumeMap.values());
   }
 
   @VisibleForTesting
-  public Map<String, HddsVolume> getVolumeMap() {
+  public Map<String, StorageVolume> getVolumeMap() {
     return ImmutableMap.copyOf(volumeMap);
   }
 
   @VisibleForTesting
-  public Map<StorageType, List<HddsVolume>> getVolumeStateMap() {
+  public Map<StorageType, List<StorageVolume>> getVolumeStateMap() {
     return ImmutableMap.copyOf(volumeStateMap);
   }
 
   public boolean hasEnoughVolumes() {
-    DatanodeConfiguration dnConf = conf.getObject(DatanodeConfiguration.class);
-    int maxVolumeFailuresTolerated = dnConf.getFailedVolumesTolerated();
-
-    // Max number of bad volumes allowed, should have at least 1 good volume
+    // Max number of bad volumes allowed, should have at least
+    // 1 good volume
     if (maxVolumeFailuresTolerated ==
-        HddsVolumeChecker.MAX_VOLUME_FAILURE_TOLERATED_LIMIT) {
+        StorageVolumeChecker.MAX_VOLUME_FAILURE_TOLERATED_LIMIT) {
       return getVolumesList().size() >= 1;
     } else {
       return getFailedVolumesList().size() <= maxVolumeFailuresTolerated;
     }
   }
 
-  public StorageLocationReport[] getStorageReport()
-      throws IOException {
+  public StorageLocationReport[] getStorageReport() {
     boolean failed;
     this.readLock();
     try {
       StorageLocationReport[] reports = new StorageLocationReport[volumeMap
           .size() + failedVolumeMap.size()];
       int counter = 0;
-      HddsVolume hddsVolume;
-      for (Map.Entry<String, HddsVolume> entry : volumeMap.entrySet()) {
-        hddsVolume = entry.getValue();
-        VolumeInfo volumeInfo = hddsVolume.getVolumeInfo();
+      StorageVolume volume;
+      for (Map.Entry<String, StorageVolume> entry : volumeMap.entrySet()) {
+        volume = entry.getValue();
+        VolumeInfo volumeInfo = volume.getVolumeInfo();
         long scmUsed;
         long remaining;
         long capacity;
@@ -587,23 +500,24 @@ public class MutableVolumeSet implements VolumeSet {
         StorageLocationReport.Builder builder =
             StorageLocationReport.newBuilder();
         builder.setStorageLocation(volumeInfo.getRootDir())
-            .setId(hddsVolume.getStorageID())
+            .setId(volume.getStorageID())
             .setFailed(failed)
             .setCapacity(capacity)
             .setRemaining(remaining)
             .setScmUsed(scmUsed)
-            .setStorageType(hddsVolume.getStorageType());
+            .setStorageType(volume.getStorageType());
         StorageLocationReport r = builder.build();
         reports[counter++] = r;
       }
-      for (Map.Entry<String, HddsVolume> entry : failedVolumeMap.entrySet()) {
-        hddsVolume = entry.getValue();
+      for (Map.Entry<String, StorageVolume> entry
+          : failedVolumeMap.entrySet()) {
+        volume = entry.getValue();
         StorageLocationReport.Builder builder = StorageLocationReport
             .newBuilder();
-        builder.setStorageLocation(hddsVolume.getHddsRootDir()
-            .getAbsolutePath()).setId(hddsVolume.getStorageID()).setFailed(true)
+        builder.setStorageLocation(volume.getStorageDir()
+            .getAbsolutePath()).setId(volume.getStorageID()).setFailed(true)
             .setCapacity(0).setRemaining(0).setScmUsed(0).setStorageType(
-            hddsVolume.getStorageType());
+            volume.getStorageType());
         StorageLocationReport r = builder.build();
         reports[counter++] = r;
       }
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/StorageVolume.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/StorageVolume.java
new file mode 100644
index 0000000..bd86b5d
--- /dev/null
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/StorageVolume.java
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.container.common.volume;
+
+import org.apache.hadoop.fs.StorageType;
+import org.apache.hadoop.hdds.conf.ConfigurationSource;
+import org.apache.hadoop.hdds.fs.SpaceUsageCheckFactory;
+import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
+import org.apache.hadoop.hdfs.server.datanode.checker.Checkable;
+import org.apache.hadoop.hdfs.server.datanode.checker.VolumeCheckResult;
+import org.apache.hadoop.util.DiskChecker;
+
+import javax.annotation.Nullable;
+import java.io.File;
+import java.io.IOException;
+import java.util.Objects;
+
+/**
+ * StorageVolume represents a generic Volume in datanode, could be
+ * 1. HddsVolume for container storage.
+ * 2. MetadataVolume for metadata(ratis) storage.
+ */
+public abstract class StorageVolume
+    implements Checkable<Boolean, VolumeCheckResult> {
+
+  /**
+   * Type for StorageVolume.
+   */
+  public enum VolumeType {
+    DATA_VOLUME,
+    META_VOLUME
+  }
+
+  private final File storageDir;
+
+  private final VolumeInfo volumeInfo;
+
+  private final VolumeSet volumeSet;
+
+  protected StorageVolume(Builder<?> b) throws IOException {
+    if (!b.failedVolume) {
+      StorageLocation location = StorageLocation.parse(b.volumeRootStr);
+      storageDir = new File(location.getUri().getPath(), b.storageDirStr);
+      this.volumeInfo = new VolumeInfo.Builder(b.volumeRootStr, b.conf)
+          .storageType(b.storageType)
+          .usageCheckFactory(b.usageCheckFactory)
+          .build();
+      this.volumeSet = b.volumeSet;
+    } else {
+      storageDir = new File(b.volumeRootStr);
+      this.volumeInfo = null;
+      this.volumeSet = null;
+    }
+  }
+
+  /**
+   * Builder class for StorageVolume.
+   * @param <T> subclass Builder
+   */
+  public abstract static class Builder<T extends Builder<T>> {
+    private final String volumeRootStr;
+    private String storageDirStr;
+    private ConfigurationSource conf;
+    private StorageType storageType;
+    private SpaceUsageCheckFactory usageCheckFactory;
+    private VolumeSet volumeSet;
+    private boolean failedVolume = false;
+
+    public Builder(String volumeRootStr, String storageDirStr) {
+      this.volumeRootStr = volumeRootStr;
+      this.storageDirStr = storageDirStr;
+    }
+
+    public abstract T getThis();
+
+    public T conf(ConfigurationSource config) {
+      this.conf = config;
+      return this.getThis();
+    }
+
+    public T storageType(StorageType st) {
+      this.storageType = st;
+      return this.getThis();
+    }
+
+    public T usageCheckFactory(SpaceUsageCheckFactory factory) {
+      this.usageCheckFactory = factory;
+      return this.getThis();
+    }
+
+    public T volumeSet(VolumeSet volSet) {
+      this.volumeSet = volSet;
+      return this.getThis();
+    }
+
+    // This is added just to create failed volume objects, which will be used
+    // to create failed StorageVolume objects in the case of any exceptions
+    // caused during creating StorageVolume object.
+    public T failedVolume(boolean failed) {
+      this.failedVolume = failed;
+      return this.getThis();
+    }
+
+    public abstract StorageVolume build() throws IOException;
+
+    public String getVolumeRootStr() {
+      return this.volumeRootStr;
+    }
+
+    public boolean getFailedVolume() {
+      return this.failedVolume;
+    }
+
+    public StorageType getStorageType() {
+      return this.storageType;
+    }
+  }
+
+  public long getCapacity() {
+    return volumeInfo != null ? volumeInfo.getCapacity() : 0;
+  }
+
+  public long getAvailable() {
+    return volumeInfo != null ? volumeInfo.getAvailable() : 0;
+  }
+
+  public long getUsedSpace() {
+    return volumeInfo != null ? volumeInfo.getScmUsed() : 0;
+  }
+
+  public File getStorageDir() {
+    return this.storageDir;
+  }
+
+  public VolumeInfo getVolumeInfo() {
+    return this.volumeInfo;
+  }
+
+  public VolumeSet getVolumeSet() {
+    return this.volumeSet;
+  }
+
+  public StorageType getStorageType() {
+    if(this.volumeInfo != null) {
+      return this.volumeInfo.getStorageType();
+    }
+    return StorageType.DEFAULT;
+  }
+
+  public String getStorageID() {
+    return "";
+  }
+
+  public void failVolume() {
+    if (volumeInfo != null) {
+      volumeInfo.shutdownUsageThread();
+    }
+  }
+
+  public void shutdown() {
+    if (volumeInfo != null) {
+      volumeInfo.shutdownUsageThread();
+    }
+  }
+
+  /**
+   * Run a check on the current volume to determine if it is healthy.
+   * @param unused context for the check, ignored.
+   * @return result of checking the volume.
+   * @throws Exception if an exception was encountered while running
+   *            the volume check.
+   */
+  @Override
+  public VolumeCheckResult check(@Nullable Boolean unused) throws Exception {
+    if (!storageDir.exists()) {
+      return VolumeCheckResult.FAILED;
+    }
+    DiskChecker.checkDir(storageDir);
+    return VolumeCheckResult.HEALTHY;
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(storageDir);
+  }
+
+  @Override
+  public boolean equals(Object other) {
+    return this == other
+        || other instanceof StorageVolume
+        && ((StorageVolume) other).storageDir.equals(this.storageDir);
+  }
+
+  @Override
+  public String toString() {
+    return getStorageDir().toString();
+  }
+}
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/HddsVolumeChecker.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/StorageVolumeChecker.java
similarity index 77%
rename from hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/HddsVolumeChecker.java
rename to hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/StorageVolumeChecker.java
index 8eaf299..1e890aa 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/HddsVolumeChecker.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/StorageVolumeChecker.java
@@ -20,25 +20,26 @@ package org.apache.hadoop.ozone.container.common.volume;
 
 import javax.annotation.Nonnull;
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 
 import com.google.common.util.concurrent.MoreExecutors;
 import org.apache.hadoop.hdds.conf.ConfigurationSource;
-import org.apache.hadoop.hdds.DFSConfigKeysLegacy;
-import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.hdfs.server.datanode.checker.VolumeCheckResult;
 import org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration;
-import org.apache.hadoop.util.DiskChecker.DiskErrorException;
 import org.apache.hadoop.util.Timer;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -52,16 +53,16 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * A class that encapsulates running disk checks against each HDDS volume and
+ * A class that encapsulates running disk checks against each volume and
  * allows retrieving a list of failed volumes. The class only detects failed
  * volumes and handling of failed volumes is responsibility of caller.
  */
-public class HddsVolumeChecker {
+public class StorageVolumeChecker {
 
   public static final int MAX_VOLUME_FAILURE_TOLERATED_LIMIT = -1;
 
   public static final Logger LOG =
-      LoggerFactory.getLogger(HddsVolumeChecker.class);
+      LoggerFactory.getLogger(StorageVolumeChecker.class);
 
   private AsyncChecker<Boolean, VolumeCheckResult> delegateChecker;
 
@@ -90,65 +91,33 @@ public class HddsVolumeChecker {
   private final ExecutorService checkVolumeResultHandlerExecutorService;
 
   /**
+   * An executor for periodic disk checks.
+   */
+  private final ScheduledExecutorService diskCheckerservice;
+  private final ScheduledFuture<?> periodicDiskChecker;
+
+  private List<MutableVolumeSet> registeredVolumeSets;
+
+  /**
    * @param conf  Configuration object.
    * @param timer {@link Timer} object used for throttling checks.
    */
-  public HddsVolumeChecker(ConfigurationSource conf, Timer timer)
-      throws DiskErrorException {
-    maxAllowedTimeForCheckMs = conf.getTimeDuration(
-        DFSConfigKeysLegacy.DFS_DATANODE_DISK_CHECK_TIMEOUT_KEY,
-        DFSConfigKeysLegacy.DFS_DATANODE_DISK_CHECK_TIMEOUT_DEFAULT,
-        TimeUnit.MILLISECONDS);
-
-    if (maxAllowedTimeForCheckMs <= 0) {
-      throw new DiskErrorException("Invalid value configured for "
-          + DFSConfigKeysLegacy.DFS_DATANODE_DISK_CHECK_TIMEOUT_KEY + " - "
-          + maxAllowedTimeForCheckMs + " (should be > 0)");
-    }
+  public StorageVolumeChecker(ConfigurationSource conf, Timer timer) {
 
     this.timer = timer;
 
     DatanodeConfiguration dnConf = conf.getObject(DatanodeConfiguration.class);
-    /**
-     * Maximum number of volume failures that can be tolerated without
-     * declaring a fatal error.
-     */
-    int maxVolumeFailuresTolerated = dnConf.getFailedVolumesTolerated();
-
-    minDiskCheckGapMs = conf.getTimeDuration(
-        DFSConfigKeysLegacy.DFS_DATANODE_DISK_CHECK_MIN_GAP_KEY,
-        DFSConfigKeysLegacy.DFS_DATANODE_DISK_CHECK_MIN_GAP_DEFAULT,
-        TimeUnit.MILLISECONDS);
-
-    if (minDiskCheckGapMs < 0) {
-      throw new DiskErrorException("Invalid value configured for "
-          + DFSConfigKeysLegacy.DFS_DATANODE_DISK_CHECK_MIN_GAP_KEY + " - "
-          + minDiskCheckGapMs + " (should be >= 0)");
-    }
 
-    long diskCheckTimeout = conf.getTimeDuration(
-        DFSConfigKeysLegacy.DFS_DATANODE_DISK_CHECK_TIMEOUT_KEY,
-        DFSConfigKeysLegacy.DFS_DATANODE_DISK_CHECK_TIMEOUT_DEFAULT,
-        TimeUnit.MILLISECONDS);
+    maxAllowedTimeForCheckMs = dnConf.getDiskCheckTimeout().toMillis();
 
-    if (diskCheckTimeout < 0) {
-      throw new DiskErrorException("Invalid value configured for "
-          + DFSConfigKeysLegacy.DFS_DATANODE_DISK_CHECK_TIMEOUT_KEY + " - "
-          + diskCheckTimeout + " (should be >= 0)");
-    }
+    minDiskCheckGapMs = dnConf.getDiskCheckMinGap().toMillis();
 
     lastAllVolumesCheck = timer.monotonicNow() - minDiskCheckGapMs;
 
-    if (maxVolumeFailuresTolerated < MAX_VOLUME_FAILURE_TOLERATED_LIMIT) {
-      throw new DiskErrorException("Invalid value configured for "
-          + DatanodeConfiguration.FAILED_VOLUMES_TOLERATED_KEY
-          + " - "
-          + maxVolumeFailuresTolerated + " "
-          + DataNode.MAX_VOLUME_FAILURES_TOLERATED_MSG);
-    }
+    registeredVolumeSets = new ArrayList<>();
 
     delegateChecker = new ThrottledAsyncChecker<>(
-        timer, minDiskCheckGapMs, diskCheckTimeout,
+        timer, minDiskCheckGapMs, maxAllowedTimeForCheckMs,
         Executors.newCachedThreadPool(
             new ThreadFactoryBuilder()
                 .setNameFormat("DataNode DiskChecker thread %d")
@@ -160,10 +129,38 @@ public class HddsVolumeChecker {
             .setNameFormat("VolumeCheck ResultHandler thread %d")
             .setDaemon(true)
             .build());
+
+    this.diskCheckerservice = Executors.newScheduledThreadPool(
+        1, r -> {
+          Thread t = new Thread(r, "Periodic HDDS volume checker");
+          t.setDaemon(true);
+          return t;
+        });
+
+    long periodicDiskCheckIntervalMinutes =
+        dnConf.getPeriodicDiskCheckIntervalMinutes();
+    this.periodicDiskChecker =
+        diskCheckerservice.scheduleWithFixedDelay(this::checkAllVolumeSets,
+            periodicDiskCheckIntervalMinutes, periodicDiskCheckIntervalMinutes,
+            TimeUnit.MINUTES);
+  }
+
+  public synchronized void registerVolumeSet(MutableVolumeSet volumeSet) {
+    registeredVolumeSets.add(volumeSet);
+  }
+
+  public synchronized void checkAllVolumeSets() {
+    try {
+      for (MutableVolumeSet volSet : registeredVolumeSets) {
+        volSet.checkAllVolumes();
+      }
+    } catch (IOException e) {
+      LOG.warn("Exception while checking disks", e);
+    }
   }
 
   /**
-   * Run checks against all HDDS volumes.
+   * Run checks against all volumes.
    * <p>
    * This check may be performed at service startup and subsequently at
    * regular intervals to detect and handle failed volumes.
@@ -173,7 +170,8 @@ public class HddsVolumeChecker {
    *                unexpected.
    * @return set of failed volumes.
    */
-  public Set<HddsVolume> checkAllVolumes(Collection<HddsVolume> volumes)
+  public Set<? extends StorageVolume> checkAllVolumes(
+      Collection<? extends StorageVolume> volumes)
       throws InterruptedException {
     final long gap = timer.monotonicNow() - lastAllVolumesCheck;
     if (gap < minDiskCheckGapMs) {
@@ -188,14 +186,14 @@ public class HddsVolumeChecker {
     }
 
     lastAllVolumesCheck = timer.monotonicNow();
-    final Set<HddsVolume> healthyVolumes = new HashSet<>();
-    final Set<HddsVolume> failedVolumes = new HashSet<>();
-    final Set<HddsVolume> allVolumes = new HashSet<>();
+    final Set<StorageVolume> healthyVolumes = new HashSet<>();
+    final Set<StorageVolume> failedVolumes = new HashSet<>();
+    final Set<StorageVolume> allVolumes = new HashSet<>();
 
     final AtomicLong numVolumes = new AtomicLong(volumes.size());
     final CountDownLatch latch = new CountDownLatch(1);
 
-    for (HddsVolume v : volumes) {
+    for (StorageVolume v : volumes) {
       Optional<ListenableFuture<VolumeCheckResult>> olf =
           delegateChecker.schedule(v, null);
       LOG.info("Scheduled health check for volume {}", v);
@@ -239,8 +237,8 @@ public class HddsVolumeChecker {
      * @param healthyVolumes set of volumes that passed disk checks.
      * @param failedVolumes  set of volumes that failed disk checks.
      */
-    void call(Set<HddsVolume> healthyVolumes,
-        Set<HddsVolume> failedVolumes) throws IOException;
+    void call(Set<StorageVolume> healthyVolumes,
+        Set<StorageVolume> failedVolumes) throws IOException;
   }
 
   /**
@@ -255,7 +253,7 @@ public class HddsVolumeChecker {
    * @return true if the check was scheduled and the callback will be invoked.
    * false otherwise.
    */
-  public boolean checkVolume(final HddsVolume volume, Callback callback) {
+  public boolean checkVolume(final StorageVolume volume, Callback callback) {
     if (volume == null) {
       LOG.debug("Cannot schedule check on null volume");
       return false;
@@ -280,9 +278,9 @@ public class HddsVolumeChecker {
    */
   private class ResultHandler
       implements FutureCallback<VolumeCheckResult> {
-    private final HddsVolume volume;
-    private final Set<HddsVolume> failedVolumes;
-    private final Set<HddsVolume> healthyVolumes;
+    private final StorageVolume volume;
+    private final Set<StorageVolume> failedVolumes;
+    private final Set<StorageVolume> healthyVolumes;
     private final AtomicLong volumeCounter;
 
     @Nullable
@@ -296,9 +294,9 @@ public class HddsVolumeChecker {
      * @param volumeCounter  volumeCounter used to trigger callback invocation.
      * @param callback       invoked when the volumeCounter reaches 0.
      */
-    ResultHandler(HddsVolume volume,
-        Set<HddsVolume> healthyVolumes,
-        Set<HddsVolume> failedVolumes,
+    ResultHandler(StorageVolume volume,
+        Set<StorageVolume> healthyVolumes,
+        Set<StorageVolume> failedVolumes,
         AtomicLong volumeCounter,
         @Nullable Callback callback) {
       this.volume = volume;
@@ -347,13 +345,13 @@ public class HddsVolumeChecker {
     }
 
     private void markHealthy() {
-      synchronized (HddsVolumeChecker.this) {
+      synchronized (StorageVolumeChecker.this) {
         healthyVolumes.add(volume);
       }
     }
 
     private void markFailed() {
-      synchronized (HddsVolumeChecker.this) {
+      synchronized (StorageVolumeChecker.this) {
         failedVolumes.add(volume);
       }
     }
@@ -381,7 +379,9 @@ public class HddsVolumeChecker {
    * See {@link ExecutorService#awaitTermination} for the interpretation
    * of the parameters.
    */
-  void shutdownAndWait(int gracePeriod, TimeUnit timeUnit) {
+  public void shutdownAndWait(int gracePeriod, TimeUnit timeUnit) {
+    periodicDiskChecker.cancel(true);
+    diskCheckerservice.shutdownNow();
     try {
       delegateChecker.shutdownAndWait(gracePeriod, timeUnit);
     } catch (InterruptedException e) {
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/StorageVolumeFactory.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/StorageVolumeFactory.java
new file mode 100644
index 0000000..9273f35
--- /dev/null
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/StorageVolumeFactory.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.container.common.volume;
+
+import org.apache.hadoop.fs.StorageType;
+import org.apache.hadoop.hdds.conf.ConfigurationSource;
+import org.apache.hadoop.hdds.fs.SpaceUsageCheckFactory;
+
+import java.io.IOException;
+
+/**
+ * A factory class for StorageVolume.
+ */
+public abstract class StorageVolumeFactory {
+
+  private ConfigurationSource conf;
+  private SpaceUsageCheckFactory usageCheckFactory;
+  private MutableVolumeSet volumeSet;
+
+  public StorageVolumeFactory(ConfigurationSource conf,
+      SpaceUsageCheckFactory usageCheckFactory, MutableVolumeSet volumeSet) {
+    this.conf = conf;
+    this.usageCheckFactory = usageCheckFactory;
+    this.volumeSet = volumeSet;
+  }
+
+  public ConfigurationSource getConf() {
+    return conf;
+  }
+
+  public SpaceUsageCheckFactory getUsageCheckFactory() {
+    return usageCheckFactory;
+  }
+
+  public VolumeSet getVolumeSet() {
+    return this.volumeSet;
+  }
+
+  abstract StorageVolume createVolume(String locationString,
+      StorageType storageType) throws IOException;
+
+  abstract StorageVolume createFailedVolume(String locationString)
+      throws IOException;
+}
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/VolumeSet.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/VolumeSet.java
index 883ef46..5a4af0c 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/VolumeSet.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/VolumeSet.java
@@ -22,8 +22,8 @@ import org.apache.hadoop.ozone.lock.ReadWriteLockable;
 import java.util.List;
 
 /**
- * Set of HDDS volumes.
+ * Set of volumes.
  */
 public interface VolumeSet extends ReadWriteLockable {
-  List<HddsVolume> getVolumesList();
+  List<StorageVolume> getVolumesList();
 }
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java
index ffa869c..87bed8c 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java
@@ -46,6 +46,7 @@ import org.apache.hadoop.ozone.container.common.interfaces.Container;
 import org.apache.hadoop.ozone.container.common.interfaces.ContainerPacker;
 import org.apache.hadoop.ozone.container.common.interfaces.VolumeChoosingPolicy;
 import org.apache.hadoop.ozone.container.common.utils.ReferenceCountedDB;
+import org.apache.hadoop.ozone.container.common.utils.StorageVolumeUtil;
 import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
 import org.apache.hadoop.ozone.container.common.volume.VolumeSet;
 import org.apache.hadoop.ozone.container.keyvalue.helpers.BlockUtils;
@@ -64,7 +65,7 @@ import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Res
 import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.ERROR_IN_DB_SYNC;
 import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.INVALID_CONTAINER_STATE;
 import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.UNSUPPORTED_REQUEST;
-import static org.apache.hadoop.ozone.container.common.utils.HddsVolumeUtil.onFailure;
+import static org.apache.hadoop.ozone.container.common.utils.StorageVolumeUtil.onFailure;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -110,8 +111,9 @@ public class KeyValueContainer implements Container<KeyValueContainerData> {
     long maxSize = containerData.getMaxSize();
     volumeSet.readLock();
     try {
-      HddsVolume containerVolume = volumeChoosingPolicy.chooseVolume(volumeSet
-          .getVolumesList(), maxSize);
+      HddsVolume containerVolume = volumeChoosingPolicy.chooseVolume(
+          StorageVolumeUtil.getHddsVolumesList(volumeSet.getVolumesList()),
+          maxSize);
       String hddsVolumeDir = containerVolume.getHddsRootDir().toString();
 
       long containerID = containerData.getContainerID();
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
index c5aea84..163556f 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
@@ -63,6 +63,7 @@ import org.apache.hadoop.ozone.container.common.interfaces.Handler;
 import org.apache.hadoop.ozone.container.common.interfaces.VolumeChoosingPolicy;
 import org.apache.hadoop.ozone.container.common.transport.server.ratis.DispatcherContext;
 import org.apache.hadoop.ozone.container.common.transport.server.ratis.DispatcherContext.WriteChunkStage;
+import org.apache.hadoop.ozone.container.common.utils.StorageVolumeUtil;
 import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
 import org.apache.hadoop.ozone.container.common.volume.RoundRobinVolumeChoosingPolicy;
 import org.apache.hadoop.ozone.container.common.volume.VolumeSet;
@@ -287,8 +288,9 @@ public class KeyValueHandler extends Handler {
       throws IOException {
     volumeSet.readLock();
     try {
-      HddsVolume containerVolume = volumeChoosingPolicy.chooseVolume(volumeSet
-          .getVolumesList(), container.getContainerData().getMaxSize());
+      HddsVolume containerVolume = volumeChoosingPolicy.chooseVolume(
+          StorageVolumeUtil.getHddsVolumesList(volumeSet.getVolumesList()),
+          container.getContainerData().getMaxSize());
       String hddsVolumeDir = containerVolume.getHddsRootDir().toString();
       container.populatePathFields(clusterId, containerVolume, hddsVolumeDir);
     } finally {
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/BlockUtils.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/BlockUtils.java
index 8749de3..6339234 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/BlockUtils.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/BlockUtils.java
@@ -39,7 +39,7 @@ import org.apache.hadoop.ozone.container.metadata.DatanodeStoreSchemaTwoImpl;
 import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.NO_SUCH_BLOCK;
 import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.UNABLE_TO_READ_METADATA_DB;
 import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.UNKNOWN_BCSID;
-import static org.apache.hadoop.ozone.container.common.utils.HddsVolumeUtil.onFailure;
+import static org.apache.hadoop.ozone.container.common.utils.StorageVolumeUtil.onFailure;
 
 /**
  * Utils functions to help block functions.
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/ChunkUtils.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/ChunkUtils.java
index 1178127..773b36a 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/ChunkUtils.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/ChunkUtils.java
@@ -55,7 +55,7 @@ import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Res
 import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.NO_SUCH_ALGORITHM;
 import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.UNABLE_TO_FIND_CHUNK;
 import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.UNSUPPORTED_REQUEST;
-import static org.apache.hadoop.ozone.container.common.utils.HddsVolumeUtil.onFailure;
+import static org.apache.hadoop.ozone.container.common.utils.StorageVolumeUtil.onFailure;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/FilePerBlockStrategy.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/FilePerBlockStrategy.java
index 715ce4c..3afcdf3 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/FilePerBlockStrategy.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/FilePerBlockStrategy.java
@@ -58,7 +58,7 @@ import java.util.concurrent.ExecutionException;
 import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.UNSUPPORTED_REQUEST;
 import static org.apache.hadoop.ozone.container.common.impl.ChunkLayOutVersion.FILE_PER_BLOCK;
 import static org.apache.hadoop.ozone.container.common.transport.server.ratis.DispatcherContext.WriteChunkStage.COMMIT_DATA;
-import static org.apache.hadoop.ozone.container.common.utils.HddsVolumeUtil.onFailure;
+import static org.apache.hadoop.ozone.container.common.utils.StorageVolumeUtil.onFailure;
 import static org.apache.hadoop.ozone.container.keyvalue.helpers.ChunkUtils.limitReadSize;
 import static org.apache.hadoop.ozone.container.keyvalue.helpers.ChunkUtils.validateChunkForOverwrite;
 import static org.apache.hadoop.ozone.container.keyvalue.helpers.ChunkUtils.verifyChunkFileExists;
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
index 2312c0b..81ce1d2 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
@@ -54,6 +54,9 @@ import org.apache.hadoop.ozone.container.common.transport.server.XceiverServerSp
 import org.apache.hadoop.ozone.container.common.transport.server.ratis.XceiverServerRatis;
 import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
 import org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet;
+import org.apache.hadoop.ozone.container.common.volume.StorageVolume;
+import org.apache.hadoop.ozone.container.common.volume.StorageVolume.VolumeType;
+import org.apache.hadoop.ozone.container.common.volume.StorageVolumeChecker;
 import org.apache.hadoop.ozone.container.keyvalue.statemachine.background.BlockDeletingService;
 import org.apache.hadoop.ozone.container.replication.ReplicationServer;
 import org.apache.hadoop.ozone.container.replication.ReplicationServer.ReplicationConfig;
@@ -64,6 +67,7 @@ import com.google.common.collect.Maps;
 import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_TIMEOUT;
 import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_TIMEOUT_DEFAULT;
 
+import org.apache.hadoop.util.Timer;
 import org.apache.ratis.grpc.GrpcTlsConfig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -81,6 +85,8 @@ public class OzoneContainer {
   private final Map<ContainerType, Handler> handlers;
   private final ConfigurationSource config;
   private final MutableVolumeSet volumeSet;
+  private final MutableVolumeSet metaVolumeSet;
+  private final StorageVolumeChecker volumeChecker;
   private final ContainerSet containerSet;
   private final XceiverServerSpi writeChannel;
   private final XceiverServerSpi readChannel;
@@ -115,9 +121,14 @@ public class OzoneContainer {
     config = conf;
     this.datanodeDetails = datanodeDetails;
     this.context = context;
+    this.volumeChecker = getVolumeChecker(conf);
+
     volumeSet = new MutableVolumeSet(datanodeDetails.getUuidString(), conf,
-        context);
+        context, VolumeType.DATA_VOLUME, volumeChecker);
     volumeSet.setFailedVolumeListener(this::handleVolumeFailures);
+    metaVolumeSet = new MutableVolumeSet(datanodeDetails.getUuidString(), conf,
+        context, VolumeType.META_VOLUME, volumeChecker);
+
     containerSet = new ContainerSet();
     metadataScanner = null;
 
@@ -194,14 +205,12 @@ public class OzoneContainer {
     return tlsClientConfig;
   }
 
-
-
   /**
    * Build's container map.
    */
   public static void buildContainerSet(MutableVolumeSet volumeSet,
         ContainerSet containerSet, ConfigurationSource config) {
-    Iterator<HddsVolume> volumeSetIterator = volumeSet.getVolumesList()
+    Iterator<StorageVolume> volumeSetIterator = volumeSet.getVolumesList()
         .iterator();
     ArrayList<Thread> volumeThreads = new ArrayList<>();
     long startTime = System.currentTimeMillis();
@@ -209,9 +218,9 @@ public class OzoneContainer {
     //TODO: diskchecker should be run before this, to see how disks are.
     // And also handle disk failure tolerance need to be added
     while (volumeSetIterator.hasNext()) {
-      HddsVolume volume = volumeSetIterator.next();
-      Thread thread = new Thread(new ContainerReader(volumeSet, volume,
-          containerSet, config));
+      StorageVolume volume = volumeSetIterator.next();
+      Thread thread = new Thread(new ContainerReader(volumeSet,
+          (HddsVolume) volume, containerSet, config));
       thread.start();
       volumeThreads.add(thread);
     }
@@ -246,8 +255,9 @@ public class OzoneContainer {
       this.metadataScanner.start();
 
       dataScanners = new ArrayList<>();
-      for (HddsVolume v : volumeSet.getVolumesList()) {
-        ContainerDataScanner s = new ContainerDataScanner(c, controller, v);
+      for (StorageVolume v : volumeSet.getVolumesList()) {
+        ContainerDataScanner s = new ContainerDataScanner(c, controller,
+            (HddsVolume) v);
         s.start();
         dataScanners.add(s);
       }
@@ -320,7 +330,9 @@ public class OzoneContainer {
     readChannel.stop();
     this.handlers.values().forEach(Handler::stop);
     hddsDispatcher.shutdown();
+    volumeChecker.shutdownAndWait(0, TimeUnit.SECONDS);
     volumeSet.shutdown();
+    metaVolumeSet.shutdown();
     blockDeletingService.shutdown();
     ContainerMetrics.remove();
   }
@@ -373,11 +385,11 @@ public class OzoneContainer {
     for (int i = 0; i < reports.length; i++) {
       nrb.addStorageReport(reports[i].getProtoBufMessage());
     }
-    List<StorageContainerDatanodeProtocolProtos.
-            MetadataStorageReportProto> metadataReport =
-            writeChannel.getStorageReport();
-    if (metadataReport != null) {
-      nrb.addAllMetadataStorageReport(metadataReport);
+
+    StorageLocationReport[] metaReports = metaVolumeSet.getStorageReport();
+    for (int i = 0; i < metaReports.length; i++) {
+      nrb.addMetadataStorageReport(
+          metaReports[i].getMetadataProtoBufMessage());
     }
     return nrb.build();
   }
@@ -391,4 +403,13 @@ public class OzoneContainer {
     return volumeSet;
   }
 
+  public MutableVolumeSet getMetaVolumeSet() {
+    return metaVolumeSet;
+  }
+
+  @VisibleForTesting
+  StorageVolumeChecker getVolumeChecker(ConfigurationSource conf) {
+    return new StorageVolumeChecker(conf, new Timer());
+  }
+
 }
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/TestBlockDeletingService.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/TestBlockDeletingService.java
index 0b918f3..347961a 100644
--- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/TestBlockDeletingService.java
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/TestBlockDeletingService.java
@@ -30,6 +30,7 @@ import java.util.concurrent.TimeoutException;
 
 import com.google.common.collect.Lists;
 import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.hdds.HddsConfigKeys;
 import org.apache.hadoop.hdds.client.BlockID;
 import org.apache.hadoop.hdds.conf.ConfigurationSource;
 import org.apache.hadoop.hdds.conf.MutableConfigurationSource;
@@ -59,6 +60,7 @@ import org.apache.hadoop.ozone.container.common.transport.server.ratis.Dispatche
 import org.apache.hadoop.ozone.container.common.utils.ReferenceCountedDB;
 import org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet;
 import org.apache.hadoop.ozone.container.common.volume.RoundRobinVolumeChoosingPolicy;
+import org.apache.hadoop.ozone.container.common.volume.StorageVolume;
 import org.apache.hadoop.ozone.container.common.volume.VolumeSet;
 import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainer;
 import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
@@ -160,8 +162,10 @@ public class TestBlockDeletingService {
     clusterID = UUID.randomUUID().toString();
     conf = new OzoneConfiguration();
     conf.set(ScmConfigKeys.HDDS_DATANODE_DIR_KEY, testRoot.getAbsolutePath());
+    conf.set(HddsConfigKeys.OZONE_METADATA_DIRS, testRoot.getAbsolutePath());
     datanodeUuid = UUID.randomUUID().toString();
-    volumeSet = new MutableVolumeSet(datanodeUuid, conf, null);
+    volumeSet = new MutableVolumeSet(datanodeUuid, conf, null,
+        StorageVolume.VolumeType.DATA_VOLUME, null);
   }
 
   @AfterClass
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/TestSchemaOneBackwardsCompatibility.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/TestSchemaOneBackwardsCompatibility.java
index cd744e0..700c6c2 100644
--- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/TestSchemaOneBackwardsCompatibility.java
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/TestSchemaOneBackwardsCompatibility.java
@@ -19,6 +19,7 @@
 package org.apache.hadoop.ozone.container.common;
 
 import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.hdds.HddsConfigKeys;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.scm.ScmConfigKeys;
 import org.apache.hadoop.hdds.utils.MetadataKeyFilters;
@@ -34,6 +35,7 @@ import org.apache.hadoop.ozone.container.common.interfaces.BlockIterator;
 import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
 import org.apache.hadoop.ozone.container.common.utils.ReferenceCountedDB;
 import org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet;
+import org.apache.hadoop.ozone.container.common.volume.StorageVolume;
 import org.apache.hadoop.ozone.container.common.volume.VolumeSet;
 import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainer;
 import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
@@ -112,6 +114,11 @@ public class TestSchemaOneBackwardsCompatibility {
 
     metadataDir = tempMetadataDir;
     containerFile = new File(metadataDir, TestDB.CONTAINER_FILE_NAME);
+
+    conf.set(ScmConfigKeys.HDDS_DATANODE_DIR_KEY,
+        metadataDir.getAbsolutePath());
+    conf.set(HddsConfigKeys.OZONE_METADATA_DIRS,
+        metadataDir.getAbsolutePath());
   }
 
   /**
@@ -238,7 +245,8 @@ public class TestSchemaOneBackwardsCompatibility {
     final long numBlocksToDelete = TestDB.NUM_PENDING_DELETION_BLOCKS;
     String datanodeUuid = UUID.randomUUID().toString();
     ContainerSet containerSet = makeContainerSet();
-    VolumeSet volumeSet = new MutableVolumeSet(datanodeUuid, conf, null);
+    VolumeSet volumeSet = new MutableVolumeSet(datanodeUuid, conf, null,
+        StorageVolume.VolumeType.DATA_VOLUME, null);
     ContainerMetrics metrics = ContainerMetrics.create(conf);
     KeyValueHandler keyValueHandler =
         new KeyValueHandler(conf, datanodeUuid, containerSet, volumeSet,
@@ -305,7 +313,8 @@ public class TestSchemaOneBackwardsCompatibility {
   public void testReadDeletedBlockChunkInfo() throws Exception {
     String datanodeUuid = UUID.randomUUID().toString();
     ContainerSet containerSet = makeContainerSet();
-    VolumeSet volumeSet = new MutableVolumeSet(datanodeUuid, conf, null);
+    VolumeSet volumeSet = new MutableVolumeSet(datanodeUuid, conf, null,
+        StorageVolume.VolumeType.DATA_VOLUME, null);
     ContainerMetrics metrics = ContainerMetrics.create(conf);
     KeyValueHandler keyValueHandler =
         new KeyValueHandler(conf, datanodeUuid, containerSet, volumeSet,
@@ -499,8 +508,6 @@ public class TestSchemaOneBackwardsCompatibility {
       throws Exception {
     conf.setInt(OZONE_BLOCK_DELETING_CONTAINER_LIMIT_PER_INTERVAL, 10);
     conf.setInt(OzoneConfigKeys.OZONE_BLOCK_DELETING_LIMIT_PER_CONTAINER, 2);
-    conf.set(ScmConfigKeys.HDDS_DATANODE_DIR_KEY,
-        metadataDir.getAbsolutePath());
 
     OzoneContainer container = makeMockOzoneContainer(keyValueHandler);
 
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerPersistence.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerPersistence.java
index 873b373..5e80a7e 100644
--- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerPersistence.java
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerPersistence.java
@@ -38,6 +38,7 @@ import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolPro
 import org.apache.hadoop.hdds.scm.ScmConfigKeys;
 import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
 import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
+import org.apache.hadoop.ozone.OzoneConfigKeys;
 import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.ozone.common.Checksum;
 import org.apache.hadoop.ozone.common.ChecksumData;
@@ -52,6 +53,7 @@ import org.apache.hadoop.ozone.container.common.transport.server.ratis.Dispatche
 import org.apache.hadoop.ozone.container.common.utils.ReferenceCountedDB;
 import org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet;
 import org.apache.hadoop.ozone.container.common.volume.RoundRobinVolumeChoosingPolicy;
+import org.apache.hadoop.ozone.container.common.volume.StorageVolume;
 import org.apache.hadoop.ozone.container.common.volume.VolumeSet;
 import org.apache.hadoop.ozone.container.keyvalue.ChunkLayoutTestInfo;
 import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainer;
@@ -130,6 +132,7 @@ public class TestContainerPersistence {
     hddsPath = GenericTestUtils
         .getTempPath(TestContainerPersistence.class.getSimpleName());
     conf.set(ScmConfigKeys.HDDS_DATANODE_DIR_KEY, hddsPath);
+    conf.set(OzoneConfigKeys.OZONE_METADATA_DIRS, hddsPath);
     volumeChoosingPolicy = new RoundRobinVolumeChoosingPolicy();
   }
 
@@ -141,7 +144,8 @@ public class TestContainerPersistence {
   @Before
   public void setupPaths() throws IOException {
     containerSet = new ContainerSet();
-    volumeSet = new MutableVolumeSet(DATANODE_UUID, conf, null);
+    volumeSet = new MutableVolumeSet(DATANODE_UUID, conf, null,
+        StorageVolume.VolumeType.DATA_VOLUME, null);
     blockManager = new BlockManagerImpl(conf);
     chunkManager = ChunkManagerFactory.createChunkManager(conf, blockManager,
         null);
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestHddsDispatcher.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestHddsDispatcher.java
index 06a31f9..618dd62 100644
--- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestHddsDispatcher.java
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestHddsDispatcher.java
@@ -37,6 +37,7 @@ import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
 import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.ContainerAction;
 import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto;
+import org.apache.hadoop.ozone.OzoneConfigKeys;
 import org.apache.hadoop.ozone.common.Checksum;
 import org.apache.hadoop.ozone.common.utils.BufferUtils;
 import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics;
@@ -46,6 +47,7 @@ import org.apache.hadoop.ozone.container.common.statemachine.DatanodeStateMachin
 import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
 import org.apache.hadoop.ozone.container.common.transport.server.ratis.DispatcherContext;
 import org.apache.hadoop.ozone.container.common.volume.RoundRobinVolumeChoosingPolicy;
+import org.apache.hadoop.ozone.container.common.volume.StorageVolume;
 import org.apache.hadoop.ozone.container.common.volume.VolumeSet;
 import org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet;
 import org.apache.hadoop.ozone.container.keyvalue.ChunkLayoutTestInfo;
@@ -100,9 +102,10 @@ public class TestHddsDispatcher {
         TestHddsDispatcher.class.getSimpleName());
     OzoneConfiguration conf = new OzoneConfiguration();
     conf.set(HDDS_DATANODE_DIR_KEY, testDir);
+    conf.set(OzoneConfigKeys.OZONE_METADATA_DIRS, testDir);
     DatanodeDetails dd = randomDatanodeDetails();
     MutableVolumeSet volumeSet = new MutableVolumeSet(dd.getUuidString(), conf,
-        null);
+        null, StorageVolume.VolumeType.DATA_VOLUME, null);
 
     try {
       UUID scmId = UUID.randomUUID();
@@ -163,6 +166,7 @@ public class TestHddsDispatcher {
       UUID scmId = UUID.randomUUID();
       OzoneConfiguration conf = new OzoneConfiguration();
       conf.set(HDDS_DATANODE_DIR_KEY, testDir);
+      conf.set(OzoneConfigKeys.OZONE_METADATA_DIRS, testDir);
       DatanodeDetails dd = randomDatanodeDetails();
       HddsDispatcher hddsDispatcher = createDispatcher(dd, scmId, conf);
       ContainerCommandRequestProto writeChunkRequest =
@@ -198,6 +202,7 @@ public class TestHddsDispatcher {
       UUID scmId = UUID.randomUUID();
       OzoneConfiguration conf = new OzoneConfiguration();
       conf.set(HDDS_DATANODE_DIR_KEY, testDir);
+      conf.set(OzoneConfigKeys.OZONE_METADATA_DIRS, testDir);
       DatanodeDetails dd = randomDatanodeDetails();
       HddsDispatcher hddsDispatcher = createDispatcher(dd, scmId, conf);
       ContainerCommandRequestProto writeChunkRequest =
@@ -239,6 +244,7 @@ public class TestHddsDispatcher {
       UUID scmId = UUID.randomUUID();
       OzoneConfiguration conf = new OzoneConfiguration();
       conf.set(HDDS_DATANODE_DIR_KEY, testDir);
+      conf.set(OzoneConfigKeys.OZONE_METADATA_DIRS, testDir);
       DatanodeDetails dd = randomDatanodeDetails();
       HddsDispatcher hddsDispatcher = createDispatcher(dd, scmId, conf);
       ContainerCommandRequestProto writeChunkRequest = getWriteChunkRequest(
@@ -278,7 +284,8 @@ public class TestHddsDispatcher {
   private HddsDispatcher createDispatcher(DatanodeDetails dd, UUID scmId,
       OzoneConfiguration conf) throws IOException {
     ContainerSet containerSet = new ContainerSet();
-    VolumeSet volumeSet = new MutableVolumeSet(dd.getUuidString(), conf, null);
+    VolumeSet volumeSet = new MutableVolumeSet(dd.getUuidString(), conf, null,
+        StorageVolume.VolumeType.DATA_VOLUME, null);
     DatanodeStateMachine stateMachine = Mockito.mock(
         DatanodeStateMachine.class);
     StateContext context = Mockito.mock(StateContext.class);
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/TestDatanodeConfiguration.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/TestDatanodeConfiguration.java
index c0cc28a..1b42654 100644
--- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/TestDatanodeConfiguration.java
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/TestDatanodeConfiguration.java
@@ -20,13 +20,20 @@ package org.apache.hadoop.ozone.container.common.statemachine;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.junit.Test;
 
+import java.util.concurrent.TimeUnit;
+
 import static org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration.CONTAINER_DELETE_THREADS_DEFAULT;
 import static org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration.CONTAINER_DELETE_THREADS_MAX_KEY;
+import static org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration.DISK_CHECK_MIN_GAP_KEY;
+import static org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration.DISK_CHECK_MIN_GAP_DEFAULT;
+import static org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration.DISK_CHECK_TIMEOUT_DEFAULT;
+import static org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration.DISK_CHECK_TIMEOUT_KEY;
 import static org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration.REPLICATION_MAX_STREAMS_DEFAULT;
 import static org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration.REPLICATION_STREAMS_LIMIT_KEY;
 import static org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration.PERIODIC_DISK_CHECK_INTERVAL_MINUTES_KEY;
 import static org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration.PERIODIC_DISK_CHECK_INTERVAL_MINUTES_DEFAULT;
-import static org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration.FAILED_VOLUMES_TOLERATED_KEY;
+import static org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration.FAILED_DATA_VOLUMES_TOLERATED_KEY;
+import static org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration.FAILED_METADATA_VOLUMES_TOLERATED_KEY;
 import static org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration.FAILED_VOLUMES_TOLERATED_DEFAULT;
 
 import static org.junit.Assert.assertEquals;
@@ -43,12 +50,21 @@ public class TestDatanodeConfiguration {
     int validDeleteThreads = 42;
     long validDiskCheckIntervalMinutes = 60;
     int validFailedVolumesTolerated = 10;
+    long validDiskCheckMinGap = 2;
+    long validDiskCheckTimeout = 1;
     OzoneConfiguration conf = new OzoneConfiguration();
     conf.setInt(REPLICATION_STREAMS_LIMIT_KEY, validReplicationLimit);
     conf.setInt(CONTAINER_DELETE_THREADS_MAX_KEY, validDeleteThreads);
     conf.setLong(PERIODIC_DISK_CHECK_INTERVAL_MINUTES_KEY,
         validDiskCheckIntervalMinutes);
-    conf.setInt(FAILED_VOLUMES_TOLERATED_KEY, validFailedVolumesTolerated);
+    conf.setInt(FAILED_DATA_VOLUMES_TOLERATED_KEY,
+        validFailedVolumesTolerated);
+    conf.setInt(FAILED_METADATA_VOLUMES_TOLERATED_KEY,
+        validFailedVolumesTolerated);
+    conf.setTimeDuration(DISK_CHECK_MIN_GAP_KEY,
+        validDiskCheckMinGap, TimeUnit.MINUTES);
+    conf.setTimeDuration(DISK_CHECK_TIMEOUT_KEY,
+        validDiskCheckTimeout, TimeUnit.MINUTES);
 
     // WHEN
     DatanodeConfiguration subject = conf.getObject(DatanodeConfiguration.class);
@@ -59,7 +75,13 @@ public class TestDatanodeConfiguration {
     assertEquals(validDiskCheckIntervalMinutes,
         subject.getPeriodicDiskCheckIntervalMinutes());
     assertEquals(validFailedVolumesTolerated,
-        subject.getFailedVolumesTolerated());
+        subject.getFailedDataVolumesTolerated());
+    assertEquals(validFailedVolumesTolerated,
+        subject.getFailedMetadataVolumesTolerated());
+    assertEquals(validDiskCheckMinGap,
+        subject.getDiskCheckMinGap().toMinutes());
+    assertEquals(validDiskCheckTimeout,
+        subject.getDiskCheckTimeout().toMinutes());
   }
 
   @Test
@@ -69,12 +91,21 @@ public class TestDatanodeConfiguration {
     int invalidDeleteThreads = 0;
     long invalidDiskCheckIntervalMinutes = -1;
     int invalidFailedVolumesTolerated = -2;
+    long invalidDiskCheckMinGap = -1;
+    long invalidDiskCheckTimeout = -1;
     OzoneConfiguration conf = new OzoneConfiguration();
     conf.setInt(REPLICATION_STREAMS_LIMIT_KEY, invalidReplicationLimit);
     conf.setInt(CONTAINER_DELETE_THREADS_MAX_KEY, invalidDeleteThreads);
     conf.setLong(PERIODIC_DISK_CHECK_INTERVAL_MINUTES_KEY,
         invalidDiskCheckIntervalMinutes);
-    conf.setInt(FAILED_VOLUMES_TOLERATED_KEY, invalidFailedVolumesTolerated);
+    conf.setInt(FAILED_DATA_VOLUMES_TOLERATED_KEY,
+        invalidFailedVolumesTolerated);
+    conf.setInt(FAILED_METADATA_VOLUMES_TOLERATED_KEY,
+        invalidFailedVolumesTolerated);
+    conf.setTimeDuration(DISK_CHECK_MIN_GAP_KEY,
+        invalidDiskCheckMinGap, TimeUnit.MINUTES);
+    conf.setTimeDuration(DISK_CHECK_TIMEOUT_KEY,
+        invalidDiskCheckTimeout, TimeUnit.MINUTES);
 
     // WHEN
     DatanodeConfiguration subject = conf.getObject(DatanodeConfiguration.class);
@@ -87,7 +118,13 @@ public class TestDatanodeConfiguration {
     assertEquals(PERIODIC_DISK_CHECK_INTERVAL_MINUTES_DEFAULT,
         subject.getPeriodicDiskCheckIntervalMinutes());
     assertEquals(FAILED_VOLUMES_TOLERATED_DEFAULT,
-        subject.getFailedVolumesTolerated());
+        subject.getFailedDataVolumesTolerated());
+    assertEquals(FAILED_VOLUMES_TOLERATED_DEFAULT,
+        subject.getFailedMetadataVolumesTolerated());
+    assertEquals(DISK_CHECK_MIN_GAP_DEFAULT,
+        subject.getDiskCheckMinGap().toMillis());
+    assertEquals(DISK_CHECK_TIMEOUT_DEFAULT,
+        subject.getDiskCheckTimeout().toMillis());
   }
 
   @Test
@@ -106,7 +143,13 @@ public class TestDatanodeConfiguration {
     assertEquals(PERIODIC_DISK_CHECK_INTERVAL_MINUTES_DEFAULT,
         subject.getPeriodicDiskCheckIntervalMinutes());
     assertEquals(FAILED_VOLUMES_TOLERATED_DEFAULT,
-        subject.getFailedVolumesTolerated());
+        subject.getFailedDataVolumesTolerated());
+    assertEquals(FAILED_VOLUMES_TOLERATED_DEFAULT,
+        subject.getFailedMetadataVolumesTolerated());
+    assertEquals(DISK_CHECK_MIN_GAP_DEFAULT,
+        subject.getDiskCheckMinGap().toMillis());
+    assertEquals(DISK_CHECK_TIMEOUT_DEFAULT,
+        subject.getDiskCheckTimeout().toMillis());
   }
 
 }
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/volume/TestHddsVolumeChecker.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/volume/TestStorageVolumeChecker.java
similarity index 89%
rename from hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/volume/TestHddsVolumeChecker.java
rename to hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/volume/TestStorageVolumeChecker.java
index 0e50dc3..3708235 100644
--- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/volume/TestHddsVolumeChecker.java
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/volume/TestStorageVolumeChecker.java
@@ -22,7 +22,6 @@ import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 
 import org.apache.commons.io.FileUtils;
-import org.apache.hadoop.hdds.DFSConfigKeysLegacy;
 import org.apache.hadoop.hdds.HddsConfigKeys;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
@@ -34,6 +33,7 @@ import org.apache.hadoop.ozone.container.common.ContainerTestUtils;
 import org.apache.hadoop.ozone.container.common.impl.ChunkLayOutVersion;
 import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
 import org.apache.hadoop.ozone.container.common.interfaces.Container;
+import org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration;
 import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
 import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
 import org.apache.ozone.test.GenericTestUtils;
@@ -55,6 +55,7 @@ import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.IOException;
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
@@ -72,12 +73,12 @@ import static org.mockito.Mockito.*;
 
 
 /**
- * Tests for {@link HddsVolumeChecker}.
+ * Tests for {@link StorageVolumeChecker}.
  */
 @RunWith(Parameterized.class)
-public class TestHddsVolumeChecker {
+public class TestStorageVolumeChecker {
   public static final Logger LOG = LoggerFactory.getLogger(
-      TestHddsVolumeChecker.class);
+      TestStorageVolumeChecker.class);
 
   private static final int NUM_VOLUMES = 2;
 
@@ -99,7 +100,7 @@ public class TestHddsVolumeChecker {
 
   private final ChunkLayOutVersion layout;
 
-  public TestHddsVolumeChecker(VolumeCheckResult result,
+  public TestStorageVolumeChecker(VolumeCheckResult result,
       ChunkLayOutVersion layout) {
     this.expectedVolumeHealth = result;
     this.layout = layout;
@@ -137,7 +138,7 @@ public class TestHddsVolumeChecker {
 
 
   /**
-   * Test {@link HddsVolumeChecker#checkVolume} propagates the
+   * Test {@link StorageVolumeChecker#checkVolume} propagates the
    * check to the delegate checker.
    *
    * @throws Exception
@@ -146,8 +147,8 @@ public class TestHddsVolumeChecker {
   public void testCheckOneVolume() throws Exception {
     LOG.info("Executing {}", testName.getMethodName());
     final HddsVolume volume = makeVolumes(1, expectedVolumeHealth).get(0);
-    final HddsVolumeChecker checker =
-        new HddsVolumeChecker(new OzoneConfiguration(), new FakeTimer());
+    final StorageVolumeChecker checker =
+        new StorageVolumeChecker(new OzoneConfiguration(), new FakeTimer());
     checker.setDelegateChecker(new DummyChecker());
     final AtomicLong numCallbackInvocations = new AtomicLong(0);
 
@@ -177,7 +178,7 @@ public class TestHddsVolumeChecker {
   }
 
   /**
-   * Test {@link HddsVolumeChecker#checkAllVolumes} propagates
+   * Test {@link StorageVolumeChecker#checkAllVolumes} propagates
    * checks for all volumes to the delegate checker.
    *
    * @throws Exception
@@ -188,11 +189,12 @@ public class TestHddsVolumeChecker {
 
     final List<HddsVolume> volumes = makeVolumes(
         NUM_VOLUMES, expectedVolumeHealth);
-    final HddsVolumeChecker checker =
-        new HddsVolumeChecker(new OzoneConfiguration(), new FakeTimer());
+    final StorageVolumeChecker checker =
+        new StorageVolumeChecker(new OzoneConfiguration(), new FakeTimer());
     checker.setDelegateChecker(new DummyChecker());
 
-    Set<HddsVolume> failedVolumes = checker.checkAllVolumes(volumes);
+    Set<? extends StorageVolume> failedVolumes =
+        checker.checkAllVolumes(volumes);
     LOG.info("Got back {} failed volumes", failedVolumes.size());
 
     if (expectedVolumeHealth == null || expectedVolumeHealth == FAILED) {
@@ -209,7 +211,7 @@ public class TestHddsVolumeChecker {
 
 
   /**
-   * Test {@link HddsVolumeChecker#checkAllVolumes} propagates
+   * Test {@link StorageVolumeChecker#checkAllVolumes} propagates
    * checks for all volumes to the delegate checker.
    *
    * @throws Exception
@@ -218,9 +220,10 @@ public class TestHddsVolumeChecker {
   public void testVolumeDeletion() throws Exception {
     LOG.info("Executing {}", testName.getMethodName());
 
-    conf.setTimeDuration(
-        DFSConfigKeysLegacy.DFS_DATANODE_DISK_CHECK_MIN_GAP_KEY, 0,
-        TimeUnit.MILLISECONDS);
+    DatanodeConfiguration dnConf =
+        conf.getObject(DatanodeConfiguration.class);
+    dnConf.setDiskCheckMinGap(Duration.ofMillis(0));
+    conf.setFromObject(dnConf);
 
     DatanodeDetails datanodeDetails =
         ContainerTestUtils.createDatanodeDetails();
@@ -229,7 +232,7 @@ public class TestHddsVolumeChecker {
     MutableVolumeSet volumeSet = ozoneContainer.getVolumeSet();
     ContainerSet containerSet = ozoneContainer.getContainerSet();
 
-    HddsVolumeChecker volumeChecker = volumeSet.getVolumeChecker();
+    StorageVolumeChecker volumeChecker = volumeSet.getVolumeChecker();
     volumeChecker.setDelegateChecker(new DummyChecker());
     File volParentDir =
         new File(folder.getRoot(), UUID.randomUUID().toString());
@@ -243,7 +246,8 @@ public class TestHddsVolumeChecker {
         Container container = ContainerTestUtils.getContainer(++i, layout,
             state);
         container.getContainerData()
-            .setVolume(volumeSet.getVolumeMap().get(volRootDir.getPath()));
+            .setVolume((HddsVolume) volumeSet.getVolumeMap()
+                .get(volRootDir.getPath()));
         ((KeyValueContainerData) container.getContainerData())
             .setMetadataPath(volParentDir.getPath());
         containerSet.addContainer(container);
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/volume/TestVolumeSet.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/volume/TestVolumeSet.java
index 2c15de9..52bf3d3 100644
--- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/volume/TestVolumeSet.java
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/volume/TestVolumeSet.java
@@ -23,8 +23,10 @@ import org.apache.commons.io.FileUtils;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.hdds.DFSConfigKeysLegacy;
+import org.apache.hadoop.hdds.HddsConfigKeys;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.ozone.OzoneConfigKeys;
 import org.apache.hadoop.ozone.container.common.utils.HddsVolumeUtil;
 import org.apache.ozone.test.GenericTestUtils;
 import org.apache.ozone.test.GenericTestUtils.LogCapturer;
@@ -64,7 +66,8 @@ public class TestVolumeSet {
   private static final String DUMMY_IP_ADDR = "0.0.0.0";
 
   private void initializeVolumeSet() throws Exception {
-    volumeSet = new MutableVolumeSet(UUID.randomUUID().toString(), conf, null);
+    volumeSet = new MutableVolumeSet(UUID.randomUUID().toString(), conf,
+        null, StorageVolume.VolumeType.DATA_VOLUME, null);
   }
 
   @Rule
@@ -77,28 +80,30 @@ public class TestVolumeSet {
     volumes.add(volume1);
     volumes.add(volume2);
     conf.set(DFSConfigKeysLegacy.DFS_DATANODE_DATA_DIR_KEY, dataDirKey);
+    conf.set(OzoneConfigKeys.DFS_CONTAINER_RATIS_DATANODE_STORAGE_DIR,
+        dataDirKey);
     initializeVolumeSet();
   }
 
   @After
   public void shutdown() throws IOException {
-    // Delete the hdds volume root dir
-    List<HddsVolume> hddsVolumes = new ArrayList<>();
-    hddsVolumes.addAll(volumeSet.getVolumesList());
-    hddsVolumes.addAll(volumeSet.getFailedVolumesList());
+    // Delete the volume root dir
+    List<StorageVolume> vols = new ArrayList<>();
+    vols.addAll(volumeSet.getVolumesList());
+    vols.addAll(volumeSet.getFailedVolumesList());
 
-    for (HddsVolume volume : hddsVolumes) {
-      FileUtils.deleteDirectory(volume.getHddsRootDir());
+    for (StorageVolume volume : vols) {
+      FileUtils.deleteDirectory(volume.getStorageDir());
     }
     volumeSet.shutdown();
 
     FileUtil.fullyDelete(new File(baseDir));
   }
 
-  private boolean checkVolumeExistsInVolumeSet(String volume) {
-    for (HddsVolume hddsVolume : volumeSet.getVolumesList()) {
-      if (hddsVolume.getHddsRootDir().getPath().equals(
-          HddsVolumeUtil.getHddsRoot(volume))) {
+  private boolean checkVolumeExistsInVolumeSet(String volumeRoot) {
+    for (StorageVolume volume : volumeSet.getVolumesList()) {
+      if (volume.getStorageDir().getPath().equals(volumeRoot)
+          || volume.getStorageDir().getParent().equals(volumeRoot)) {
         return true;
       }
     }
@@ -108,7 +113,7 @@ public class TestVolumeSet {
   @Test
   public void testVolumeSetInitialization() throws Exception {
 
-    List<HddsVolume> volumesList = volumeSet.getVolumesList();
+    List<StorageVolume> volumesList = volumeSet.getVolumesList();
 
     // VolumeSet initialization should add volume1 and volume2 to VolumeSet
     assertEquals("VolumeSet intialization is incorrect",
@@ -138,7 +143,7 @@ public class TestVolumeSet {
   public void testFailVolume() throws Exception {
 
     //Fail a volume
-    volumeSet.failVolume(volume1);
+    volumeSet.failVolume(HddsVolumeUtil.getHddsRoot(volume1));
 
     // Failed volume should not show up in the volumeList
     assertEquals(1, volumeSet.getVolumesList().size());
@@ -148,8 +153,7 @@ public class TestVolumeSet {
         1, volumeSet.getFailedVolumesList().size());
     assertEquals("Failed Volume list did not match",
         HddsVolumeUtil.getHddsRoot(volume1),
-        volumeSet.getFailedVolumesList().get(0).getHddsRootDir().getPath());
-    assertTrue(volumeSet.getFailedVolumesList().get(0).isFailed());
+        volumeSet.getFailedVolumesList().get(0).getStorageDir().getPath());
 
     // Failed volume should not exist in VolumeMap
     assertFalse(volumeSet.getVolumeMap().containsKey(volume1));
@@ -161,14 +165,14 @@ public class TestVolumeSet {
     assertEquals(2, volumeSet.getVolumesList().size());
 
     // Remove a volume from VolumeSet
-    volumeSet.removeVolume(volume1);
+    volumeSet.removeVolume(HddsVolumeUtil.getHddsRoot(volume1));
     assertEquals(1, volumeSet.getVolumesList().size());
 
     // Attempting to remove a volume which does not exist in VolumeSet should
     // log a warning.
     LogCapturer logs = LogCapturer.captureLogs(
         LogFactory.getLog(MutableVolumeSet.class));
-    volumeSet.removeVolume(volume1);
+    volumeSet.removeVolume(HddsVolumeUtil.getHddsRoot(volume1));
     assertEquals(1, volumeSet.getVolumesList().size());
     String expectedLogMessage = "Volume : " +
         HddsVolumeUtil.getHddsRoot(volume1) + " does not exist in VolumeSet";
@@ -209,12 +213,12 @@ public class TestVolumeSet {
 
   @Test
   public void testShutdown() throws Exception {
-    List<HddsVolume> volumesList = volumeSet.getVolumesList();
+    List<StorageVolume> volumesList = volumeSet.getVolumesList();
 
     volumeSet.shutdown();
 
     // Verify that volume usage can be queried during shutdown.
-    for (HddsVolume volume : volumesList) {
+    for (StorageVolume volume : volumesList) {
       Assert.assertNotNull(volume.getVolumeInfo().getUsageForTesting());
       volume.getAvailable();
     }
@@ -230,11 +234,13 @@ public class TestVolumeSet {
     OzoneConfiguration ozoneConfig = new OzoneConfiguration();
     ozoneConfig.set(HDDS_DATANODE_DIR_KEY, readOnlyVolumePath.getAbsolutePath()
         + "," + volumePath.getAbsolutePath());
+    ozoneConfig.set(HddsConfigKeys.OZONE_METADATA_DIRS,
+        volumePath.getAbsolutePath());
     volSet = new MutableVolumeSet(UUID.randomUUID().toString(), ozoneConfig,
-        null);
+        null, StorageVolume.VolumeType.DATA_VOLUME, null);
     assertEquals(1, volSet.getFailedVolumesList().size());
     assertEquals(readOnlyVolumePath, volSet.getFailedVolumesList().get(0)
-        .getHddsRootDir());
+        .getStorageDir());
 
     //Set back to writable
     try {
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/volume/TestVolumeSetDiskChecks.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/volume/TestVolumeSetDiskChecks.java
index c17b4ca..76e771d 100644
--- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/volume/TestVolumeSetDiskChecks.java
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/volume/TestVolumeSetDiskChecks.java
@@ -29,6 +29,7 @@ import java.util.UUID;
 import org.apache.hadoop.hdds.DFSConfigKeysLegacy;
 import org.apache.hadoop.hdds.conf.ConfigurationSource;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.ozone.OzoneConfigKeys;
 import org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration;
 import org.apache.ozone.test.GenericTestUtils;
 import org.apache.hadoop.util.DiskChecker.DiskErrorException;
@@ -42,6 +43,8 @@ import org.junit.After;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
+
+import org.junit.Assert;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.ExpectedException;
@@ -89,7 +92,8 @@ public class TestVolumeSetDiskChecks {
 
     conf = getConfWithDataNodeDirs(numVolumes);
     final MutableVolumeSet volumeSet =
-        new MutableVolumeSet(UUID.randomUUID().toString(), conf, null);
+        new MutableVolumeSet(UUID.randomUUID().toString(), conf,
+            null, StorageVolume.VolumeType.DATA_VOLUME, null);
 
     assertThat(volumeSet.getVolumesList().size(), is(numVolumes));
     assertThat(volumeSet.getFailedVolumesList().size(), is(0));
@@ -113,19 +117,27 @@ public class TestVolumeSetDiskChecks {
     final int numBadVolumes = 2;
 
     conf = getConfWithDataNodeDirs(numVolumes);
+    StorageVolumeChecker dummyChecker =
+        new DummyChecker(conf, new Timer(), numBadVolumes);
     final MutableVolumeSet volumeSet = new MutableVolumeSet(
-        UUID.randomUUID().toString(), conf, null) {
-      @Override
-      HddsVolumeChecker getVolumeChecker(ConfigurationSource configuration)
-          throws DiskErrorException {
-        return new DummyChecker(configuration, new Timer(), numBadVolumes);
-      }
-    };
-
-    assertThat(volumeSet.getFailedVolumesList().size(), is(numBadVolumes));
-    assertThat(volumeSet.getVolumesList().size(),
-        is(numVolumes - numBadVolumes));
+        UUID.randomUUID().toString(), conf, null,
+        StorageVolume.VolumeType.DATA_VOLUME,
+        dummyChecker);
+    final MutableVolumeSet metaVolumeSet = new MutableVolumeSet(
+        UUID.randomUUID().toString(), conf, null,
+        StorageVolume.VolumeType.META_VOLUME,
+        dummyChecker);
+
+    Assert.assertEquals(volumeSet.getFailedVolumesList().size(),
+        numBadVolumes);
+    Assert.assertEquals(volumeSet.getVolumesList().size(),
+        numVolumes - numBadVolumes);
+    Assert.assertEquals(metaVolumeSet.getFailedVolumesList().size(),
+        numBadVolumes);
+    Assert.assertEquals(metaVolumeSet.getVolumesList().size(),
+        numVolumes - numBadVolumes);
     volumeSet.shutdown();
+    metaVolumeSet.shutdown();
   }
 
   /**
@@ -136,19 +148,24 @@ public class TestVolumeSetDiskChecks {
     final int numVolumes = 5;
 
     conf = getConfWithDataNodeDirs(numVolumes);
+    StorageVolumeChecker dummyChecker =
+        new DummyChecker(conf, new Timer(), numVolumes);
 
     final MutableVolumeSet volumeSet = new MutableVolumeSet(
-        UUID.randomUUID().toString(), conf, null) {
-      @Override
-      HddsVolumeChecker getVolumeChecker(ConfigurationSource configuration)
-          throws DiskErrorException {
-        return new DummyChecker(configuration, new Timer(), numVolumes);
-      }
-    };
+        UUID.randomUUID().toString(), conf, null,
+        StorageVolume.VolumeType.DATA_VOLUME,
+        new DummyChecker(conf, new Timer(), numVolumes));
+    final MutableVolumeSet metaVolumeSet = new MutableVolumeSet(
+        UUID.randomUUID().toString(), conf, null,
+        StorageVolume.VolumeType.META_VOLUME,
+        dummyChecker);
 
     assertEquals(volumeSet.getFailedVolumesList().size(), numVolumes);
     assertEquals(volumeSet.getVolumesList().size(), 0);
+    assertEquals(metaVolumeSet.getFailedVolumesList().size(), numVolumes);
+    assertEquals(metaVolumeSet.getVolumesList().size(), 0);
     volumeSet.shutdown();
+    metaVolumeSet.shutdown();
   }
 
   /**
@@ -164,9 +181,17 @@ public class TestVolumeSetDiskChecks {
     }
     ozoneConf.set(DFSConfigKeysLegacy.DFS_DATANODE_DATA_DIR_KEY,
         String.join(",", dirs));
+
+    final List<String> metaDirs = new ArrayList<>();
+    for (int i = 0; i < numDirs; ++i) {
+      metaDirs.add(GenericTestUtils.getRandomizedTestDir().getPath());
+    }
+    ozoneConf.set(OzoneConfigKeys.DFS_CONTAINER_RATIS_DATANODE_STORAGE_DIR,
+        String.join(",", metaDirs));
     DatanodeConfiguration dnConf =
         ozoneConf.getObject(DatanodeConfiguration.class);
-    dnConf.setFailedVolumesTolerated(numDirs);
+    dnConf.setFailedDataVolumesTolerated(numDirs * 2);
+    dnConf.setFailedMetadataVolumesTolerated(numDirs * 2);
     ozoneConf.setFromObject(dnConf);
     return ozoneConf;
   }
@@ -175,7 +200,7 @@ public class TestVolumeSetDiskChecks {
    * A no-op checker that fails the given number of volumes and succeeds
    * the rest.
    */
-  static class DummyChecker extends HddsVolumeChecker {
+  static class DummyChecker extends StorageVolumeChecker {
     private final int numBadVolumes;
 
     DummyChecker(ConfigurationSource conf, Timer timer, int numBadVolumes)
@@ -185,7 +210,8 @@ public class TestVolumeSetDiskChecks {
     }
 
     @Override
-    public Set<HddsVolume> checkAllVolumes(Collection<HddsVolume> volumes)
+    public Set<? extends StorageVolume> checkAllVolumes(
+        Collection<? extends StorageVolume> volumes)
         throws InterruptedException {
       // Return the first 'numBadVolumes' as failed.
       return ImmutableSet.copyOf(Iterables.limit(volumes, numBadVolumes));
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueBlockIterator.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueBlockIterator.java
index a5f28e9..264c6bb 100644
--- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueBlockIterator.java
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueBlockIterator.java
@@ -28,6 +28,7 @@ import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
 import org.apache.hadoop.hdds.utils.MetadataKeyFilters;
 import org.apache.hadoop.hdds.utils.db.Table;
+import org.apache.hadoop.ozone.OzoneConfigKeys;
 import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.ozone.container.common.helpers.BlockData;
 import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo;
@@ -36,6 +37,7 @@ import org.apache.hadoop.ozone.container.common.interfaces.BlockIterator;
 import org.apache.hadoop.ozone.container.common.utils.ReferenceCountedDB;
 import org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet;
 import org.apache.hadoop.ozone.container.common.volume.RoundRobinVolumeChoosingPolicy;
+import org.apache.hadoop.ozone.container.common.volume.StorageVolume;
 import org.apache.hadoop.ozone.container.keyvalue.helpers.BlockUtils;
 import org.apache.ozone.test.GenericTestUtils;
 
@@ -85,7 +87,9 @@ public class TestKeyValueBlockIterator {
     testRoot = GenericTestUtils.getRandomizedTestDir();
     conf = new OzoneConfiguration();
     conf.set(HDDS_DATANODE_DIR_KEY, testRoot.getAbsolutePath());
-    volumeSet = new MutableVolumeSet(UUID.randomUUID().toString(), conf, null);
+    conf.set(OzoneConfigKeys.OZONE_METADATA_DIRS, testRoot.getAbsolutePath());
+    volumeSet = new MutableVolumeSet(UUID.randomUUID().toString(), conf, null,
+        StorageVolume.VolumeType.DATA_VOLUME, null);
 
     containerData = new KeyValueContainerData(105L,
             layout,
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueContainer.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueContainer.java
index 4b27363..fa97cd1 100644
--- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueContainer.java
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueContainer.java
@@ -32,6 +32,7 @@ import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.ozone.container.common.helpers.BlockData;
 import org.apache.hadoop.ozone.container.common.impl.ChunkLayOutVersion;
 import org.apache.hadoop.ozone.container.common.impl.ContainerDataYaml;
+import org.apache.hadoop.ozone.container.common.utils.StorageVolumeUtil;
 import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
 import org.apache.hadoop.ozone.container.common.volume
     .RoundRobinVolumeChoosingPolicy;
@@ -178,8 +179,8 @@ public class TestKeyValueContainer {
             datanodeId.toString());
     KeyValueContainer container = new KeyValueContainer(containerData, CONF);
 
-    HddsVolume containerVolume = volumeChoosingPolicy.chooseVolume(volumeSet
-        .getVolumesList(), 1);
+    HddsVolume containerVolume = volumeChoosingPolicy.chooseVolume(
+        StorageVolumeUtil.getHddsVolumesList(volumeSet.getVolumesList()), 1);
     String hddsVolumeDir = containerVolume.getHddsRootDir().toString();
 
     container.populatePathFields(scmId, containerVolume, hddsVolumeDir);
@@ -220,8 +221,8 @@ public class TestKeyValueContainer {
             datanodeId.toString());
     container = new KeyValueContainer(containerData, CONF);
 
-    containerVolume = volumeChoosingPolicy.chooseVolume(volumeSet
-        .getVolumesList(), 1);
+    containerVolume = volumeChoosingPolicy.chooseVolume(
+        StorageVolumeUtil.getHddsVolumesList(volumeSet.getVolumesList()), 1);
     hddsVolumeDir = containerVolume.getHddsRootDir().toString();
     container.populatePathFields(scmId, containerVolume, hddsVolumeDir);
     try {
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueContainerCheck.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueContainerCheck.java
index 1b2f781..71175b6 100644
--- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueContainerCheck.java
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueContainerCheck.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.hdds.client.BlockID;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
 import org.apache.hadoop.hdfs.util.DataTransferThrottler;
+import org.apache.hadoop.ozone.OzoneConfigKeys;
 import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.ozone.common.Checksum;
 import org.apache.hadoop.ozone.common.ChecksumData;
@@ -32,6 +33,7 @@ import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo;
 import org.apache.hadoop.ozone.container.common.interfaces.BlockIterator;
 import org.apache.hadoop.ozone.container.common.transport.server.ratis.DispatcherContext;
 import org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet;
+import org.apache.hadoop.ozone.container.common.volume.StorageVolume;
 import org.apache.hadoop.ozone.container.keyvalue.helpers.KeyValueContainerLocationUtil;
 import org.apache.hadoop.ozone.container.common.volume.RoundRobinVolumeChoosingPolicy;
 import org.apache.hadoop.ozone.container.keyvalue.helpers.BlockUtils;
@@ -96,8 +98,10 @@ import static org.junit.Assert.assertFalse;
     this.testRoot = GenericTestUtils.getRandomizedTestDir();
     conf = new OzoneConfiguration();
     conf.set(HDDS_DATANODE_DIR_KEY, testRoot.getAbsolutePath());
+    conf.set(OzoneConfigKeys.OZONE_METADATA_DIRS, testRoot.getAbsolutePath());
     chunkManagerTestInfo.updateConfig(conf);
-    volumeSet = new MutableVolumeSet(UUID.randomUUID().toString(), conf, null);
+    volumeSet = new MutableVolumeSet(UUID.randomUUID().toString(), conf, null,
+        StorageVolume.VolumeType.DATA_VOLUME, null);
     chunkManager = chunkManagerTestInfo.createChunkManager(true, null);
   }
 
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandler.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandler.java
index 0c6092b..583d043 100644
--- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandler.java
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandler.java
@@ -46,10 +46,12 @@ import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
 import org.apache.hadoop.ozone.container.common.transport.server.ratis.DispatcherContext;
 import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
 import org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet;
+import org.apache.hadoop.ozone.container.common.volume.StorageVolume;
 import org.apache.hadoop.ozone.container.common.volume.VolumeSet;
 import org.apache.ozone.test.GenericTestUtils;
 
 import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_DATANODE_VOLUME_CHOOSING_POLICY;
+import static org.apache.hadoop.hdds.HddsConfigKeys.OZONE_METADATA_DIRS;
 import static org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_DATANODE_DIR_KEY;
 import static org.junit.Assert.assertEquals;
 
@@ -263,9 +265,10 @@ public class TestKeyValueHandler {
     File path = GenericTestUtils.getRandomizedTestDir();
     OzoneConfiguration conf = new OzoneConfiguration();
     conf.set(HDDS_DATANODE_DIR_KEY, path.getAbsolutePath());
+    conf.set(OZONE_METADATA_DIRS, path.getAbsolutePath());
     MutableVolumeSet
         volumeSet = new MutableVolumeSet(UUID.randomUUID().toString(), conf,
-        null);
+        null, StorageVolume.VolumeType.DATA_VOLUME, null);
     try {
       ContainerSet cset = new ContainerSet();
       int[] interval = new int[1];
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestContainerReader.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestContainerReader.java
index ad87629..2cd9673 100644
--- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestContainerReader.java
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestContainerReader.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
 import org.apache.hadoop.hdds.utils.db.Table;
 import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import org.apache.hadoop.ozone.OzoneConfigKeys;
 import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.ozone.container.common.helpers.BlockData;
 import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo;
@@ -35,6 +36,7 @@ import org.apache.hadoop.ozone.container.common.utils.ReferenceCountedDB;
 import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
 import org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet;
 import org.apache.hadoop.ozone.container.common.volume.RoundRobinVolumeChoosingPolicy;
+import org.apache.hadoop.ozone.container.common.volume.StorageVolume;
 import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainer;
 import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
 import org.apache.hadoop.ozone.container.keyvalue.helpers.BlockUtils;
@@ -268,8 +270,11 @@ public class TestContainerReader {
     }
     conf.set(ScmConfigKeys.HDDS_DATANODE_DIR_KEY,
         datanodeDirs.toString());
+    conf.set(OzoneConfigKeys.DFS_CONTAINER_RATIS_DATANODE_STORAGE_DIR,
+        datanodeDirs.toString());
     MutableVolumeSet volumeSets =
-        new MutableVolumeSet(datanodeId.toString(), clusterId, conf, null);
+        new MutableVolumeSet(datanodeId.toString(), clusterId, conf, null,
+            StorageVolume.VolumeType.DATA_VOLUME, null);
     ContainerCache cache = ContainerCache.getInstance(conf);
     cache.clear();
 
@@ -303,12 +308,12 @@ public class TestContainerReader {
       BlockUtils.removeDB(keyValueContainerData, conf);
     }
 
-    List<HddsVolume> hddsVolumes = volumeSets.getVolumesList();
+    List<StorageVolume> volumes = volumeSets.getVolumesList();
     ContainerReader[] containerReaders = new ContainerReader[volumeNum];
     Thread[] threads = new Thread[volumeNum];
     for (int i = 0; i < volumeNum; i++) {
       containerReaders[i] = new ContainerReader(volumeSets,
-          hddsVolumes.get(i), containerSet, conf);
+          (HddsVolume) volumes.get(i), containerSet, conf);
       threads[i] = new Thread(containerReaders[i]);
     }
     long startTime = System.currentTimeMillis();
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainer.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainer.java
index 85c61d4..1aa2940 100644
--- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainer.java
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainer.java
@@ -38,9 +38,11 @@ import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
 import org.apache.hadoop.ozone.container.common.statemachine.DatanodeStateMachine;
 import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
 import org.apache.hadoop.ozone.container.common.utils.ReferenceCountedDB;
+import org.apache.hadoop.ozone.container.common.utils.StorageVolumeUtil;
 import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
 import org.apache.hadoop.ozone.container.common.volume.RoundRobinVolumeChoosingPolicy;
 import org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet;
+import org.apache.hadoop.ozone.container.common.volume.StorageVolume;
 import org.apache.hadoop.ozone.container.keyvalue.ChunkLayoutTestInfo;
 import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainer;
 import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
@@ -105,7 +107,7 @@ public class TestOzoneContainer {
         folder.newFolder().getAbsolutePath());
     commitSpaceMap = new HashMap<String, Long>();
     volumeSet = new MutableVolumeSet(datanodeDetails.getUuidString(), conf,
-        null);
+        null, StorageVolume.VolumeType.DATA_VOLUME, null);
     volumeChoosingPolicy = new RoundRobinVolumeChoosingPolicy();
   }
 
@@ -121,7 +123,9 @@ public class TestOzoneContainer {
   public void testBuildContainerMap() throws Exception {
 
     // Format the volumes
-    for (HddsVolume volume : volumeSet.getVolumesList()) {
+    List<HddsVolume> volumes =
+        StorageVolumeUtil.getHddsVolumesList(volumeSet.getVolumesList());
+    for (HddsVolume volume : volumes) {
       volume.format(clusterId);
       commitSpaceMap.put(getVolumeKey(volume), Long.valueOf(0));
     }
@@ -219,8 +223,10 @@ public class TestOzoneContainer {
   public void testContainerCreateDiskFull() throws Exception {
     long containerSize = (long) StorageUnit.MB.toBytes(100);
 
+    List<HddsVolume> volumes =
+        StorageVolumeUtil.getHddsVolumesList(volumeSet.getVolumesList());
     // Format the volumes
-    for (HddsVolume volume : volumeSet.getVolumesList()) {
+    for (HddsVolume volume : volumes) {
       volume.format(UUID.randomUUID().toString());
 
       // eat up all available space except size of 1 container
@@ -247,7 +253,9 @@ public class TestOzoneContainer {
 
   //verify committed space on each volume
   private void verifyCommittedSpace(OzoneContainer oc) {
-    for (HddsVolume dnVol : oc.getVolumeSet().getVolumesList()) {
+    List<HddsVolume> volumes = StorageVolumeUtil.getHddsVolumesList(
+        oc.getVolumeSet().getVolumesList());
+    for (HddsVolume dnVol : volumes) {
       String key = getVolumeKey(dnVol);
       long expectedCommit = commitSpaceMap.get(key).longValue();
       long volumeCommitted = dnVol.getCommittedBytes();
diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/HddsServerUtil.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/HddsServerUtil.java
index a8f4609..309d377 100644
--- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/HddsServerUtil.java
+++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/HddsServerUtil.java
@@ -65,10 +65,12 @@ import org.apache.hadoop.ozone.OzoneConfigKeys;
 import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.security.UserGroupInformation;
 
+import static org.apache.hadoop.hdds.DFSConfigKeysLegacy.DFS_DATANODE_DATA_DIR_KEY;
 import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_HEARTBEAT_INTERVAL;
 import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_HEARTBEAT_INTERVAL_DEFAULT;
 import static org.apache.hadoop.hdds.HddsUtils.getHostNameFromConfigKeys;
 import static org.apache.hadoop.hdds.HddsUtils.getPortNumberFromConfigKeys;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_DATANODE_DIR_KEY;
 import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_DATANODE_PORT_KEY;
 import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_DEADNODE_INTERVAL;
 import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_DEADNODE_INTERVAL_DEFAULT;
@@ -372,6 +374,20 @@ public final class HddsServerUtil {
     return rawLocations;
   }
 
+  public static Collection<String> getDatanodeStorageDirs(
+      ConfigurationSource conf) {
+    Collection<String> rawLocations = conf.getTrimmedStringCollection(
+        HDDS_DATANODE_DIR_KEY);
+    if (rawLocations.isEmpty()) {
+      rawLocations = conf.getTrimmedStringCollection(DFS_DATANODE_DATA_DIR_KEY);
+    }
+    if (rawLocations.isEmpty()) {
+      throw new IllegalArgumentException("No location configured in either "
+          + HDDS_DATANODE_DIR_KEY + " or " + DFS_DATANODE_DATA_DIR_KEY);
+    }
+    return rawLocations;
+  }
+
   /**
    * Get the path for datanode id file.
    *
diff --git a/hadoop-hdds/interface-server/src/main/proto/ScmServerDatanodeHeartbeatProtocol.proto b/hadoop-hdds/interface-server/src/main/proto/ScmServerDatanodeHeartbeatProtocol.proto
index a6b5794..8fc017f 100644
--- a/hadoop-hdds/interface-server/src/main/proto/ScmServerDatanodeHeartbeatProtocol.proto
+++ b/hadoop-hdds/interface-server/src/main/proto/ScmServerDatanodeHeartbeatProtocol.proto
@@ -162,6 +162,10 @@ message StorageReportProto {
 message MetadataStorageReportProto {
   required string storageLocation = 1;
   optional StorageTypeProto storageType = 2 [default = DISK];
+  optional uint64 capacity = 3 [default = 0];
+  optional uint64 scmUsed = 4 [default = 0];
+  optional uint64 remaining = 5 [default = 0];
+  optional bool failed = 6 [default = false];
 }
 
 /**
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/common/TestEndPoint.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/common/TestEndPoint.java
index 5a211c6..b25cd38 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/common/TestEndPoint.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/common/TestEndPoint.java
@@ -50,6 +50,7 @@ import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
 import org.apache.hadoop.ozone.container.common.states.endpoint.HeartbeatEndpointTask;
 import org.apache.hadoop.ozone.container.common.states.endpoint.RegisterEndpointTask;
 import org.apache.hadoop.ozone.container.common.states.endpoint.VersionEndpointTask;
+import org.apache.hadoop.ozone.container.common.utils.StorageVolumeUtil;
 import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
 import org.apache.hadoop.ozone.container.ozoneimpl.ContainerController;
 import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
@@ -192,8 +193,8 @@ public class TestEndPoint {
       newState = versionTask.call();
       Assert.assertEquals(EndpointStateMachine.EndPointStates.SHUTDOWN,
           newState);
-      List<HddsVolume> volumesList = ozoneContainer.getVolumeSet()
-          .getFailedVolumesList();
+      List<HddsVolume> volumesList = StorageVolumeUtil.getHddsVolumesList(
+          ozoneContainer.getVolumeSet().getFailedVolumesList());
       Assert.assertTrue(volumesList.size() == 1);
       Assert.assertTrue(logCapturer.getOutput()
           .contains("org.apache.hadoop.ozone.common" +
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/metrics/TestContainerMetrics.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/metrics/TestContainerMetrics.java
index cb44734..61559fb 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/metrics/TestContainerMetrics.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/metrics/TestContainerMetrics.java
@@ -18,6 +18,7 @@
 package org.apache.hadoop.ozone.container.metrics;
 
 import java.io.File;
+import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 
@@ -44,8 +45,10 @@ import org.apache.hadoop.ozone.container.common.interfaces.Handler;
 import org.apache.hadoop.ozone.container.common.statemachine.DatanodeStateMachine;
 import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
 import org.apache.hadoop.ozone.container.common.transport.server.XceiverServerGrpc;
+import org.apache.hadoop.ozone.container.common.utils.StorageVolumeUtil;
 import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
 import org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet;
+import org.apache.hadoop.ozone.container.common.volume.StorageVolume;
 import org.apache.hadoop.ozone.container.common.volume.VolumeSet;
 import org.apache.ozone.test.GenericTestUtils;
 
@@ -91,8 +94,10 @@ public class TestContainerMetrics {
 
       DatanodeDetails datanodeDetails = randomDatanodeDetails();
       conf.set(ScmConfigKeys.HDDS_DATANODE_DIR_KEY, path);
+      conf.set(OzoneConfigKeys.OZONE_METADATA_DIRS, path);
       VolumeSet volumeSet = new MutableVolumeSet(
-          datanodeDetails.getUuidString(), conf, null);
+          datanodeDetails.getUuidString(), conf,
+          null, StorageVolume.VolumeType.DATA_VOLUME, null);
       ContainerSet containerSet = new ContainerSet();
       DatanodeStateMachine stateMachine = Mockito.mock(
           DatanodeStateMachine.class);
@@ -159,7 +164,9 @@ public class TestContainerMetrics {
       assertQuantileGauges("WriteChunkNanos" + sec, containerMetrics);
 
       // Check VolumeIOStats metrics
-      HddsVolume hddsVolume = volumeSet.getVolumesList().get(0);
+      List<HddsVolume> volumes =
+          StorageVolumeUtil.getHddsVolumesList(volumeSet.getVolumesList());
+      HddsVolume hddsVolume = volumes.get(0);
       MetricsRecordBuilder volumeIOMetrics =
           getMetrics(hddsVolume.getVolumeIOStats().getMetricsSourceName());
       assertCounter("ReadBytes", 1024L, volumeIOMetrics);
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/server/TestSecureContainerServer.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/server/TestSecureContainerServer.java
index d931ca7..adabe62 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/server/TestSecureContainerServer.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/server/TestSecureContainerServer.java
@@ -65,6 +65,7 @@ import org.apache.hadoop.ozone.container.common.transport.server.XceiverServerGr
 import org.apache.hadoop.ozone.container.common.transport.server.XceiverServerSpi;
 import org.apache.hadoop.ozone.container.common.transport.server.ratis.XceiverServerRatis;
 import org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet;
+import org.apache.hadoop.ozone.container.common.volume.StorageVolume;
 import org.apache.hadoop.ozone.container.common.volume.VolumeSet;
 import org.apache.hadoop.ozone.container.ozoneimpl.ContainerController;
 import org.apache.hadoop.ozone.security.OzoneBlockTokenSecretManager;
@@ -72,6 +73,7 @@ import org.apache.hadoop.security.token.Token;
 import org.apache.ozone.test.GenericTestUtils;
 
 import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_BLOCK_TOKEN_ENABLED;
+import static org.apache.hadoop.hdds.HddsConfigKeys.OZONE_METADATA_DIRS;
 import static org.apache.hadoop.hdds.HddsUtils.isReadOnly;
 import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.SUCCESS;
 import static org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_DATANODE_DIR_KEY;
@@ -168,7 +170,9 @@ public class TestSecureContainerServer {
     conf.set(HDDS_DATANODE_DIR_KEY,
         Paths.get(TEST_DIR, "dfs", "data", "hdds",
             RandomStringUtils.randomAlphabetic(4)).toString());
-    VolumeSet volumeSet = new MutableVolumeSet(dd.getUuidString(), conf, null);
+    conf.set(OZONE_METADATA_DIRS, TEST_DIR);
+    VolumeSet volumeSet = new MutableVolumeSet(dd.getUuidString(), conf, null,
+        StorageVolume.VolumeType.DATA_VOLUME, null);
     DatanodeStateMachine stateMachine = Mockito.mock(
         DatanodeStateMachine.class);
     StateContext context = Mockito.mock(StateContext.class);
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/dn/DatanodeTestUtils.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/dn/DatanodeTestUtils.java
index fb5bb72..372a71a 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/dn/DatanodeTestUtils.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/dn/DatanodeTestUtils.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.ozone.dn;
 
 import com.google.common.base.Supplier;
 import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
+import org.apache.hadoop.ozone.container.common.volume.StorageVolume;
 import org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet;
 import org.apache.ozone.test.GenericTestUtils;
 
@@ -181,7 +182,7 @@ public final class DatanodeTestUtils {
   /**
    * Simulate a bad rootDir by removing write permission.
    * @see {@link org.apache.hadoop.ozone.container.common.volume
-   * .HddsVolume#check(Boolean)}
+   * .StorageVolume#check(Boolean)}
    * @param rootDir
    */
   public static void simulateBadRootDir(File rootDir) {
@@ -193,16 +194,16 @@ public final class DatanodeTestUtils {
   /**
    * Simulate a bad volume by removing write permission.
    * @see {@link org.apache.hadoop.ozone.container.common.volume
-   * .HddsVolume#check(Boolean)}
+   * .StorageVolume#check(Boolean)}
    * @param vol
    */
-  public static void simulateBadVolume(HddsVolume vol) {
-    simulateBadRootDir(vol.getHddsRootDir());
+  public static void simulateBadVolume(StorageVolume vol) {
+    simulateBadRootDir(vol.getStorageDir());
   }
 
   /**
    * Restore a simulated bad volume to normal.
-   * @see {@link #simulateBadVolume(HddsVolume)}
+   * @see {@link #simulateBadVolume(StorageVolume)}
    * @param rootDir
    */
   public static void restoreBadRootDir(File rootDir) {
@@ -213,11 +214,11 @@ public final class DatanodeTestUtils {
 
   /**
    * Restore a simulated bad rootDir to normal.
-   * @see {@link #simulateBadVolume(HddsVolume)}
+   * @see {@link #simulateBadVolume(StorageVolume)}
    * @param vol
    */
-  public static void restoreBadVolume(HddsVolume vol) {
-    restoreBadRootDir(vol.getHddsRootDir());
+  public static void restoreBadVolume(StorageVolume vol) {
+    restoreBadRootDir(vol.getStorageDir());
   }
 
   /**
@@ -254,7 +255,7 @@ public final class DatanodeTestUtils {
   }
 
   public static File getHddsVolumeClusterDir(HddsVolume vol) {
-    File hddsVolumeRootDir = vol.getHddsRootDir();
+    File hddsVolumeRootDir = vol.getStorageDir();
     return new File(hddsVolumeRootDir, vol.getClusterID());
   }
 }
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/dn/TestDatanodeLayoutUpgradeTool.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/dn/TestDatanodeLayoutUpgradeTool.java
index fa7a3c1..07bc3fc 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/dn/TestDatanodeLayoutUpgradeTool.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/dn/TestDatanodeLayoutUpgradeTool.java
@@ -17,6 +17,7 @@
 package org.apache.hadoop.ozone.dn;
 
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.utils.HddsServerUtil;
 import org.apache.hadoop.ozone.HddsDatanodeService;
 import org.apache.hadoop.ozone.MiniOzoneCluster;
 import org.apache.hadoop.ozone.client.ObjectStore;
@@ -26,7 +27,6 @@ import org.apache.hadoop.ozone.client.OzoneVolume;
 import org.apache.hadoop.ozone.client.io.OzoneInputStream;
 import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
 import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
-import org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet;
 import org.apache.hadoop.ozone.debug.DatanodeLayout;
 import org.junit.Before;
 import org.junit.After;
@@ -107,7 +107,7 @@ public class TestDatanodeLayoutUpgradeTool {
 
     List<HddsDatanodeService> dns = cluster.getHddsDatanodes();
     OzoneConfiguration c1 = dns.get(0).getConf();
-    Collection<String> paths = MutableVolumeSet.getDatanodeStorageDirs(c1);
+    Collection<String> paths = HddsServerUtil.getDatanodeStorageDirs(c1);
 
     for (String p : paths) {
       // Verify that tool is able to verify the storage path
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/dn/volume/TestDatanodeHddsVolumeFailureDetection.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/dn/volume/TestDatanodeHddsVolumeFailureDetection.java
index ef61ad1..cd40355 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/dn/volume/TestDatanodeHddsVolumeFailureDetection.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/dn/volume/TestDatanodeHddsVolumeFailureDetection.java
@@ -19,7 +19,6 @@
  */
 package org.apache.hadoop.ozone.dn.volume;
 
-import org.apache.hadoop.hdds.DFSConfigKeysLegacy;
 import org.apache.hadoop.hdds.client.ReplicationFactor;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
@@ -42,6 +41,7 @@ import org.apache.hadoop.ozone.container.common.interfaces.Container;
 import org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration;
 import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
 import org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet;
+import org.apache.hadoop.ozone.container.common.volume.StorageVolume;
 import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
 import org.apache.hadoop.ozone.dn.DatanodeTestUtils;
 import org.junit.After;
@@ -51,10 +51,10 @@ import org.junit.Test;
 
 import java.io.File;
 import java.io.IOException;
+import java.time.Duration;
 import java.util.HashMap;
 import java.util.List;
 import java.util.UUID;
-import java.util.concurrent.TimeUnit;
 
 import org.junit.Rule;
 import org.junit.rules.Timeout;
@@ -93,14 +93,12 @@ public class TestDatanodeHddsVolumeFailureDetection {
     // keep the cache size = 1, so we could trigger io exception on
     // reading on-disk db instance
     ozoneConfig.setInt(OZONE_CONTAINER_CACHE_SIZE, 1);
-    // shorten the gap between successive checks to ease tests
-    ozoneConfig.setTimeDuration(
-        DFSConfigKeysLegacy.DFS_DATANODE_DISK_CHECK_MIN_GAP_KEY, 5,
-        TimeUnit.SECONDS);
     // set tolerated = 1
+    // shorten the gap between successive checks to ease tests
     DatanodeConfiguration dnConf =
         ozoneConfig.getObject(DatanodeConfiguration.class);
-    dnConf.setFailedVolumesTolerated(1);
+    dnConf.setFailedDataVolumesTolerated(1);
+    dnConf.setDiskCheckMinGap(Duration.ofSeconds(5));
     ozoneConfig.setFromObject(dnConf);
     cluster = MiniOzoneCluster.newBuilder(ozoneConfig)
         .setNumDatanodes(1)
@@ -150,8 +148,10 @@ public class TestDatanodeHddsVolumeFailureDetection {
     HddsDatanodeService dn = datanodes.get(0);
     OzoneContainer oc = dn.getDatanodeStateMachine().getContainer();
     MutableVolumeSet volSet = oc.getVolumeSet();
-    HddsVolume vol0 = volSet.getVolumesList().get(0);
-    File clusterDir = DatanodeTestUtils.getHddsVolumeClusterDir(vol0);
+    StorageVolume vol0 = volSet.getVolumesList().get(0);
+    Assert.assertTrue(vol0 instanceof HddsVolume);
+    File clusterDir = DatanodeTestUtils.getHddsVolumeClusterDir(
+        (HddsVolume) vol0);
     File currentDir = new File(clusterDir, Storage.STORAGE_DIR_CURRENT);
     File containerTopDir = new File(currentDir, Storage.CONTAINER_DIR + "0");
     File containerDir = new File(containerTopDir, "1");
@@ -205,7 +205,7 @@ public class TestDatanodeHddsVolumeFailureDetection {
     HddsDatanodeService dn = datanodes.get(0);
     OzoneContainer oc = dn.getDatanodeStateMachine().getContainer();
     MutableVolumeSet volSet = oc.getVolumeSet();
-    HddsVolume vol0 = volSet.getVolumesList().get(0);
+    StorageVolume vol0 = volSet.getVolumesList().get(0);
     Container c1 = oc.getContainerSet().getContainer(container.
         getContainerInfo().getContainerID());
     File metadataDir = new File(c1.getContainerFile().getParent());
@@ -267,7 +267,7 @@ public class TestDatanodeHddsVolumeFailureDetection {
     // simulate bad volume by removing write permission on root dir
     // refer to HddsVolume.check()
     MutableVolumeSet volSet = oc.getVolumeSet();
-    HddsVolume vol0 = volSet.getVolumesList().get(0);
+    StorageVolume vol0 = volSet.getVolumesList().get(0);
     DatanodeTestUtils.simulateBadVolume(vol0);
 
     // read written file to trigger checkVolumeAsync
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/dn/volume/TestDatanodeHddsVolumeFailureToleration.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/dn/volume/TestDatanodeHddsVolumeFailureToleration.java
index 55dd958..e2f29e5 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/dn/volume/TestDatanodeHddsVolumeFailureToleration.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/dn/volume/TestDatanodeHddsVolumeFailureToleration.java
@@ -24,8 +24,8 @@ import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.ozone.HddsDatanodeService;
 import org.apache.hadoop.ozone.MiniOzoneCluster;
 import org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration;
-import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
 import org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet;
+import org.apache.hadoop.ozone.container.common.volume.StorageVolume;
 import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
 import org.apache.hadoop.ozone.dn.DatanodeTestUtils;
 import org.junit.After;
@@ -63,7 +63,7 @@ public class TestDatanodeHddsVolumeFailureToleration {
     // set tolerated = 1
     DatanodeConfiguration dnConf =
         ozoneConfig.getObject(DatanodeConfiguration.class);
-    dnConf.setFailedVolumesTolerated(1);
+    dnConf.setFailedDataVolumesTolerated(1);
     ozoneConfig.setFromObject(dnConf);
     cluster = MiniOzoneCluster.newBuilder(ozoneConfig)
         .setNumDatanodes(1)
@@ -85,9 +85,9 @@ public class TestDatanodeHddsVolumeFailureToleration {
     HddsDatanodeService dn = datanodes.get(0);
     OzoneContainer oc = dn.getDatanodeStateMachine().getContainer();
     MutableVolumeSet volSet = oc.getVolumeSet();
-    HddsVolume vol0 = volSet.getVolumesList().get(0);
+    StorageVolume vol0 = volSet.getVolumesList().get(0);
     // keep the file for restore since we'll do restart
-    File volRootDir0 = vol0.getHddsRootDir();
+    File volRootDir0 = vol0.getStorageDir();
 
     // simulate bad volumes <= tolerated
     DatanodeTestUtils.simulateBadRootDir(volRootDir0);
@@ -106,10 +106,10 @@ public class TestDatanodeHddsVolumeFailureToleration {
     HddsDatanodeService dn = datanodes.get(0);
     OzoneContainer oc = dn.getDatanodeStateMachine().getContainer();
     MutableVolumeSet volSet = oc.getVolumeSet();
-    HddsVolume vol0 = volSet.getVolumesList().get(0);
-    HddsVolume vol1 = volSet.getVolumesList().get(1);
-    File volRootDir0 = vol0.getHddsRootDir();
-    File volRootDir1 = vol1.getHddsRootDir();
+    StorageVolume vol0 = volSet.getVolumesList().get(0);
+    StorageVolume vol1 = volSet.getVolumesList().get(1);
+    File volRootDir0 = vol0.getStorageDir();
+    File volRootDir1 = vol1.getStorageDir();
 
     // simulate bad volumes > tolerated
     DatanodeTestUtils.simulateBadRootDir(volRootDir0);
diff --git a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/debug/DatanodeLayout.java b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/debug/DatanodeLayout.java
index 2bb121d..c35e00e 100644
--- a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/debug/DatanodeLayout.java
+++ b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/debug/DatanodeLayout.java
@@ -23,6 +23,7 @@ import org.apache.hadoop.hdds.cli.SubcommandWithParent;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.scm.ScmConfigKeys;
 import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
+import org.apache.hadoop.ozone.container.common.utils.StorageVolumeUtil;
 import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
 import org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet;
 import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
@@ -96,11 +97,14 @@ public class DatanodeLayout extends GenericCli
     OzoneContainer.buildContainerSet(volumeSet, containerSet, conf);
     volumeSet.shutdown();
 
+    List<HddsVolume> failedVolumes = StorageVolumeUtil.getHddsVolumesList(
+        volumeSet.getFailedVolumesList());
+
     if (verify) {
-      for (HddsVolume vol : volumeSet.getFailedVolumesList()) {
+      for (HddsVolume vol : failedVolumes) {
         System.out.println("Failed Volume:" + vol.getHddsRootDir());
       }
     }
-    return volumeSet.getFailedVolumesList();
+    return failedVolumes;
   }
 }
\ No newline at end of file
diff --git a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/debug/container/ContainerCommands.java b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/debug/container/ContainerCommands.java
index b355eb6..96c701f 100644
--- a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/debug/container/ContainerCommands.java
+++ b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/debug/container/ContainerCommands.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.hdds.conf.ConfigurationSource;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
 import org.apache.hadoop.hdds.server.JsonUtils;
+import org.apache.hadoop.hdds.utils.HddsServerUtil;
 import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
 import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.ozone.common.InconsistentStorageStateException;
@@ -35,8 +36,10 @@ import org.apache.hadoop.ozone.container.common.impl.ContainerData;
 import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
 import org.apache.hadoop.ozone.container.common.interfaces.Handler;
 import org.apache.hadoop.ozone.container.common.utils.HddsVolumeUtil;
+import org.apache.hadoop.ozone.container.common.utils.StorageVolumeUtil;
 import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
 import org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet;
+import org.apache.hadoop.ozone.container.common.volume.StorageVolume;
 import org.apache.hadoop.ozone.container.ozoneimpl.ContainerController;
 import org.apache.hadoop.ozone.container.ozoneimpl.ContainerReader;
 import org.apache.hadoop.ozone.debug.OzoneDebug;
@@ -56,6 +59,7 @@ import java.nio.file.Paths;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import java.util.concurrent.Callable;
@@ -114,7 +118,8 @@ public class ContainerCommands implements Callable<Void>, SubcommandWithParent {
 
     String clusterId = getClusterId(firstStorageDir);
 
-    volumeSet = new MutableVolumeSet(datanodeUuid, conf, null);
+    volumeSet = new MutableVolumeSet(datanodeUuid, conf, null,
+        StorageVolume.VolumeType.DATA_VOLUME, null);
 
     Map<ContainerProtos.ContainerType, Handler> handlers = new HashMap<>();
 
@@ -136,8 +141,9 @@ public class ContainerCommands implements Callable<Void>, SubcommandWithParent {
 
     controller = new ContainerController(containerSet, handlers);
 
-    Iterator<HddsVolume> volumeSetIterator = volumeSet.getVolumesList()
-        .iterator();
+    List<HddsVolume> volumes = StorageVolumeUtil.getHddsVolumesList(
+        volumeSet.getVolumesList());
+    Iterator<HddsVolume> volumeSetIterator = volumes.iterator();
 
     LOG.info("Starting the read all the container metadata");
 
@@ -190,7 +196,7 @@ public class ContainerCommands implements Callable<Void>, SubcommandWithParent {
   private String getFirstStorageDir(ConfigurationSource config)
       throws IOException {
     final Collection<String> storageDirs =
-        MutableVolumeSet.getDatanodeStorageDirs(config);
+        HddsServerUtil.getDatanodeStorageDirs(config);
 
     return
         StorageLocation.parse(storageDirs.iterator().next())
diff --git a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/ChunkManagerDiskWrite.java b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/ChunkManagerDiskWrite.java
index 0507b53..e95747c 100644
--- a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/ChunkManagerDiskWrite.java
+++ b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/ChunkManagerDiskWrite.java
@@ -35,6 +35,7 @@ import org.apache.hadoop.ozone.container.common.transport.server.ratis.Dispatche
 import org.apache.hadoop.ozone.container.common.transport.server.ratis.DispatcherContext.WriteChunkStage;
 import org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet;
 import org.apache.hadoop.ozone.container.common.volume.RoundRobinVolumeChoosingPolicy;
+import org.apache.hadoop.ozone.container.common.volume.StorageVolume;
 import org.apache.hadoop.ozone.container.common.volume.VolumeSet;
 import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainer;
 import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
@@ -102,7 +103,8 @@ public class ChunkManagerDiskWrite extends BaseFreonGenerator implements
       OzoneConfiguration ozoneConfiguration = createOzoneConfiguration();
 
       VolumeSet volumeSet =
-          new MutableVolumeSet("dnid", "clusterid", ozoneConfiguration, null);
+          new MutableVolumeSet("dnid", "clusterid", ozoneConfiguration, null,
+              StorageVolume.VolumeType.DATA_VOLUME, null);
 
       Random random = new Random();
 
diff --git a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/ClosedContainerReplicator.java b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/ClosedContainerReplicator.java
index 7f3de10..21a835c 100644
--- a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/ClosedContainerReplicator.java
+++ b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/ClosedContainerReplicator.java
@@ -38,11 +38,13 @@ import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
 import org.apache.hadoop.hdds.scm.cli.ContainerOperationClient;
 import org.apache.hadoop.hdds.scm.container.ContainerInfo;
 import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
+import org.apache.hadoop.hdds.utils.HddsServerUtil;
 import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
 import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics;
 import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
 import org.apache.hadoop.ozone.container.common.interfaces.Handler;
 import org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet;
+import org.apache.hadoop.ozone.container.common.volume.StorageVolume;
 import org.apache.hadoop.ozone.container.keyvalue.TarContainerPacker;
 import org.apache.hadoop.ozone.container.ozoneimpl.ContainerController;
 import org.apache.hadoop.ozone.container.replication.ContainerReplicator;
@@ -85,7 +87,7 @@ public class ClosedContainerReplicator extends BaseFreonGenerator implements
     OzoneConfiguration conf = createOzoneConfiguration();
 
     final Collection<String> datanodeStorageDirs =
-        MutableVolumeSet.getDatanodeStorageDirs(conf);
+        HddsServerUtil.getDatanodeStorageDirs(conf);
 
     for (String dir : datanodeStorageDirs) {
       checkDestinationDirectory(dir);
@@ -172,7 +174,7 @@ public class ClosedContainerReplicator extends BaseFreonGenerator implements
     ContainerMetrics metrics = ContainerMetrics.create(conf);
 
     MutableVolumeSet volumeSet = new MutableVolumeSet(fakeDatanodeUuid, conf,
-        null);
+        null, StorageVolume.VolumeType.DATA_VOLUME, null);
 
     Map<ContainerType, Handler> handlers = new HashMap<>();
 
diff --git a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/containergenerator/GeneratorDatanode.java b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/containergenerator/GeneratorDatanode.java
index 5a0d00c..4aebad0 100644
--- a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/containergenerator/GeneratorDatanode.java
+++ b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/containergenerator/GeneratorDatanode.java
@@ -37,6 +37,7 @@ import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChecksumData;
 import org.apache.hadoop.hdds.scm.OzoneClientConfig;
 import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
+import org.apache.hadoop.hdds.utils.HddsServerUtil;
 import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
 import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.ozone.common.Checksum;
@@ -50,6 +51,7 @@ import org.apache.hadoop.ozone.container.common.transport.server.ratis.Dispatche
 import org.apache.hadoop.ozone.container.common.utils.HddsVolumeUtil;
 import org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet;
 import org.apache.hadoop.ozone.container.common.volume.RoundRobinVolumeChoosingPolicy;
+import org.apache.hadoop.ozone.container.common.volume.StorageVolume;
 import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainer;
 import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
 import org.apache.hadoop.ozone.container.keyvalue.impl.BlockManagerImpl;
@@ -131,7 +133,7 @@ public class GeneratorDatanode extends BaseGenerator {
         .createChunkManager(config, blockManager, null);
 
     final Collection<String> storageDirs =
-        MutableVolumeSet.getDatanodeStorageDirs(config);
+        HddsServerUtil.getDatanodeStorageDirs(config);
 
     String firstStorageDir =
         StorageLocation.parse(storageDirs.iterator().next())
@@ -159,7 +161,8 @@ public class GeneratorDatanode extends BaseGenerator {
     datanodeId = HddsVolumeUtil
         .getProperty(props, OzoneConsts.DATANODE_UUID, versionFile);
 
-    volumeSet = new MutableVolumeSet(datanodeId, clusterId, config, null);
+    volumeSet = new MutableVolumeSet(datanodeId, clusterId, config, null,
+        StorageVolume.VolumeType.DATA_VOLUME, null);
 
     volumeChoosingPolicy = new RoundRobinVolumeChoosingPolicy();
 
diff --git a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/genesis/BenchMarkDatanodeDispatcher.java b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/genesis/BenchMarkDatanodeDispatcher.java
index c5cd876..c00e27e 100644
--- a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/genesis/BenchMarkDatanodeDispatcher.java
+++ b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/genesis/BenchMarkDatanodeDispatcher.java
@@ -51,6 +51,7 @@ import org.apache.commons.codec.digest.DigestUtils;
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.lang3.RandomStringUtils;
 import org.apache.commons.lang3.RandomUtils;
+import org.apache.hadoop.ozone.container.common.volume.StorageVolume;
 import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
 import org.openjdk.jmh.annotations.Benchmark;
 import org.openjdk.jmh.annotations.Level;
@@ -102,7 +103,8 @@ public class BenchMarkDatanodeDispatcher {
     conf.set("ozone.scm.container.size", "10MB");
 
     ContainerSet containerSet = new ContainerSet();
-    volumeSet = new MutableVolumeSet(datanodeUuid, conf, null);
+    volumeSet = new MutableVolumeSet(datanodeUuid, conf, null,
+        StorageVolume.VolumeType.DATA_VOLUME, null);
     StateContext context = new StateContext(
         conf, DatanodeStates.RUNNING, null);
     ContainerMetrics metrics = ContainerMetrics.create(conf);

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org