You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by sa...@apache.org on 2022/04/22 07:34:41 UTC

[ozone] branch HDDS-3630 updated: HDDS-6541. [Merge rocksdb in datanode] Per-disk DB location management. (#3292)

This is an automated email from the ASF dual-hosted git repository.

sammichen pushed a commit to branch HDDS-3630
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/HDDS-3630 by this push:
     new 30d3b4d1a3 HDDS-6541. [Merge rocksdb in datanode] Per-disk DB location management. (#3292)
30d3b4d1a3 is described below

commit 30d3b4d1a3c58e93bc057836f408a912be4b7bdf
Author: Gui Hecheng <ma...@tencent.com>
AuthorDate: Fri Apr 22 15:34:35 2022 +0800

    HDDS-6541. [Merge rocksdb in datanode] Per-disk DB location management. (#3292)
---
 .../hadoop/hdds/upgrade/HDDSLayoutFeature.java     |   4 +-
 .../org/apache/hadoop/ozone/OzoneConfigKeys.java   |   5 +
 .../java/org/apache/hadoop/ozone/OzoneConsts.java  |   1 +
 .../common/src/main/resources/ozone-default.xml    |  14 +
 .../apache/hadoop/ozone/HddsDatanodeService.java   |   6 +-
 .../common/statemachine/DatanodeConfiguration.java |  49 +++
 .../common/statemachine/DatanodeStateMachine.java  |   4 +-
 .../states/endpoint/VersionEndpointTask.java       |  70 ++--
 .../container/common/utils/DatanodeStoreCache.java |  12 +-
 .../container/common/utils/HddsVolumeUtil.java     | 230 ++++---------
 .../container/common/utils/StorageVolumeUtil.java  | 202 +++++++++++
 .../ozone/container/common/volume/DbVolume.java    | 153 +++++++++
 ...dataVolumeFactory.java => DbVolumeFactory.java} |  35 +-
 .../ozone/container/common/volume/HddsVolume.java  | 376 ++++++++-------------
 .../container/common/volume/HddsVolumeFactory.java |  37 +-
 .../container/common/volume/MetadataVolume.java    |   5 +
 .../common/volume/MetadataVolumeFactory.java       |   2 +-
 .../container/common/volume/MutableVolumeSet.java  |   6 +
 .../container/common/volume/StorageVolume.java     | 271 ++++++++++++++-
 .../common/volume/StorageVolumeFactory.java        |  41 ++-
 .../ozone/container/ozoneimpl/OzoneContainer.java  |  28 +-
 .../ScmHAFinalizeUpgradeActionDatanode.java        |   2 +-
 .../upgrade/VersionedDatanodeFeatures.java         |  28 +-
 .../ozone/container/common/ContainerTestUtils.java |  13 +
 .../common/helpers/TestDatanodeVersionFile.java    |  18 +-
 .../statemachine/TestDatanodeConfiguration.java    |  11 +
 .../container/common/utils/TestHddsVolumeUtil.java | 238 +++++++++++++
 .../common/utils/TestStorageVolumeUtil.java        |  99 ++++++
 .../container/common/volume/TestDbVolume.java      | 172 ++++++++++
 .../container/common/volume/TestHddsVolume.java    | 164 +++++++--
 .../container/common/volume/TestStorageVolume.java |  83 +++++
 .../common/volume/TestVolumeSetDiskChecks.java     |  30 ++
 .../container/ozoneimpl/TestOzoneContainer.java    |  16 +-
 .../upgrade/TestDatanodeUpgradeToScmHA.java        |  31 +-
 .../apache/hadoop/hdds/utils/HddsServerUtil.java   |   7 +
 .../proto/ScmServerDatanodeHeartbeatProtocol.proto |   1 +
 .../non-rolling-upgrade/1.1.0-1.2.0/callback.sh    |   2 +-
 .../ozone/debug/container/ContainerCommands.java   |   3 +-
 .../containergenerator/GeneratorDatanode.java      |   8 +-
 39 files changed, 1925 insertions(+), 552 deletions(-)

diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/upgrade/HDDSLayoutFeature.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/upgrade/HDDSLayoutFeature.java
index 2bc1a6718c..655937b3af 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/upgrade/HDDSLayoutFeature.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/upgrade/HDDSLayoutFeature.java
@@ -31,7 +31,9 @@ public enum HDDSLayoutFeature implements LayoutFeature {
   INITIAL_VERSION(0, "Initial Layout Version"),
   DATANODE_SCHEMA_V2(1, "Datanode RocksDB Schema Version 2 (with column " +
       "families)"),
-  SCM_HA(2, "Storage Container Manager HA");
+  SCM_HA(2, "Storage Container Manager HA"),
+  DATANODE_SCHEMA_V3(3, "Datanode RocksDB Schema Version 3 (one rocksdb " +
+      "per disk)");
 
   //////////////////////////////  //////////////////////////////
 
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
index 627c432d3c..aa9f63ab53 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
@@ -328,6 +328,11 @@ public final class OzoneConfigKeys {
   public static final String
       HDDS_DATANODE_METADATA_ROCKSDB_CACHE_SIZE_DEFAULT = "1GB";
 
+  // Specifying the dedicated volumes for per-disk db instances.
+  // For container schema v3 only.
+  public static final String HDDS_DATANODE_CONTAINER_DB_DIR =
+      "hdds.datanode.container.db.dir";
+
   public static final String OZONE_SECURITY_ENABLED_KEY =
       "ozone.security.enabled";
   public static final boolean OZONE_SECURITY_ENABLED_DEFAULT = false;
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
index 7983f8eaf1..1a20fd05ce 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
@@ -135,6 +135,7 @@ public final class OzoneConsts {
   public static final String SCM_DB_NAME = "scm.db";
   public static final String OM_DB_BACKUP_PREFIX = "om.db.backup.";
   public static final String SCM_DB_BACKUP_PREFIX = "scm.db.backup.";
+  public static final String CONTAINER_DB_NAME = "container.db";
 
   public static final String STORAGE_DIR_CHUNKS = "chunks";
   public static final String OZONE_DB_CHECKPOINT_REQUEST_FLUSH =
diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml b/hadoop-hdds/common/src/main/resources/ozone-default.xml
index 24f0c454c2..d6abf81cc4 100644
--- a/hadoop-hdds/common/src/main/resources/ozone-default.xml
+++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml
@@ -144,6 +144,20 @@
       tagged explicitly.
     </description>
   </property>
+  <property>
+    <name>hdds.datanode.container.db.dir</name>
+    <value/>
+    <tag>OZONE, CONTAINER, STORAGE, MANAGEMENT</tag>
+    <description>Determines where the per-disk rocksdb instances will be
+      stored. This setting is optional. If unspecified, then rocksdb instances
+      are stored on the same disk as HDDS data.
+      The directories should be tagged with corresponding storage types
+      ([SSD]/[DISK]/[ARCHIVE]/[RAM_DISK]) for storage policies. The default
+      storage type will be DISK if the directory does not have a storage type
+      tagged explicitly. Ideally, this should be mapped to a fast disk
+      like an SSD.
+    </description>
+  </property>
   <property>
     <name>hdds.datanode.dir.du.reserved</name>
     <value/>
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/HddsDatanodeService.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/HddsDatanodeService.java
index 82a733aabd..ed46cde1e3 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/HddsDatanodeService.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/HddsDatanodeService.java
@@ -57,7 +57,7 @@ import org.apache.hadoop.metrics2.util.MBeans;
 import org.apache.hadoop.ozone.container.common.DatanodeLayoutStorage;
 import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
 import org.apache.hadoop.ozone.container.common.statemachine.DatanodeStateMachine;
-import org.apache.hadoop.ozone.container.common.utils.HddsVolumeUtil;
+import org.apache.hadoop.ozone.container.common.utils.StorageVolumeUtil;
 import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
 import org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet;
 import org.apache.hadoop.ozone.container.common.volume.StorageVolume;
@@ -319,8 +319,8 @@ public class HddsDatanodeService extends GenericCli implements ServicePlugin {
 
     for (Map.Entry<String, StorageVolume> entry : volumeMap.entrySet()) {
       HddsVolume hddsVolume = (HddsVolume) entry.getValue();
-      boolean result = HddsVolumeUtil.checkVolume(hddsVolume, clusterId,
-          clusterId, conf, LOG);
+      boolean result = StorageVolumeUtil.checkVolume(hddsVolume, clusterId,
+          clusterId, conf, LOG, null);
       if (!result) {
         volumeSet.failVolume(hddsVolume.getHddsRootDir().getPath());
       }
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeConfiguration.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeConfiguration.java
index 24df9f5b1e..d6f59d3f7d 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeConfiguration.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeConfiguration.java
@@ -45,6 +45,8 @@ public class DatanodeConfiguration {
       "hdds.datanode.failed.data.volumes.tolerated";
   public static final String FAILED_METADATA_VOLUMES_TOLERATED_KEY =
       "hdds.datanode.failed.metadata.volumes.tolerated";
+  public static final String FAILED_DB_VOLUMES_TOLERATED_KEY =
+      "hdds.datanode.failed.db.volumes.tolerated";
   public static final String DISK_CHECK_MIN_GAP_KEY =
       "hdds.datanode.disk.check.min.gap";
   public static final String DISK_CHECK_TIMEOUT_KEY =
@@ -52,6 +54,8 @@ public class DatanodeConfiguration {
 
   public static final String WAIT_ON_ALL_FOLLOWERS =
       "hdds.datanode.wait.on.all.followers";
+  public static final String CONTAINER_SCHEMA_V3_ENABLED =
+      "hdds.datanode.container.schema.v3.enabled";
 
   static final boolean CHUNK_DATA_VALIDATION_CHECK_DEFAULT = false;
 
@@ -67,6 +71,8 @@ public class DatanodeConfiguration {
   static final long DISK_CHECK_TIMEOUT_DEFAULT =
       Duration.ofMinutes(10).toMillis();
 
+  static final boolean CONTAINER_SCHEMA_V3_ENABLED_DEFAULT = false;
+
   /**
    * Number of threads per volume that Datanode will use for chunk read.
    */
@@ -195,6 +201,17 @@ public class DatanodeConfiguration {
   )
   private int failedMetadataVolumesTolerated = FAILED_VOLUMES_TOLERATED_DEFAULT;
 
+  @Config(key = "failed.db.volumes.tolerated",
+      defaultValue = "-1",
+      type = ConfigType.INT,
+      tags = { DATANODE },
+      description = "The number of db volumes that are allowed to fail "
+          + "before a datanode stops offering service. "
+          + "Config this to -1 means unlimited, but we should have "
+          + "at least one good volume left."
+  )
+  private int failedDbVolumesTolerated = FAILED_VOLUMES_TOLERATED_DEFAULT;
+
   @Config(key = "disk.check.min.gap",
       defaultValue = "15m",
       type = ConfigType.TIME,
@@ -245,6 +262,15 @@ public class DatanodeConfiguration {
     this.waitOnAllFollowers = val;
   }
 
+  @Config(key = "container.schema.v3.enabled",
+      defaultValue = "false",
+      type = ConfigType.BOOLEAN,
+      tags = { DATANODE },
+      description = "Enable use of container schema v3(one rocksdb per disk)."
+  )
+  private boolean containerSchemaV3Enabled =
+      CONTAINER_SCHEMA_V3_ENABLED_DEFAULT;
+
   @PostConstruct
   public void validate() {
     if (containerDeleteThreads < 1) {
@@ -277,6 +303,13 @@ public class DatanodeConfiguration {
       failedMetadataVolumesTolerated = FAILED_VOLUMES_TOLERATED_DEFAULT;
     }
 
+    if (failedDbVolumesTolerated < -1) {
+      LOG.warn(FAILED_DB_VOLUMES_TOLERATED_KEY +
+              "must be greater than -1 and was set to {}. Defaulting to {}",
+          failedDbVolumesTolerated, FAILED_VOLUMES_TOLERATED_DEFAULT);
+      failedDbVolumesTolerated = FAILED_VOLUMES_TOLERATED_DEFAULT;
+    }
+
     if (diskCheckMinGap < 0) {
       LOG.warn(DISK_CHECK_MIN_GAP_KEY +
               " must be greater than zero and was set to {}. Defaulting to {}",
@@ -325,6 +358,14 @@ public class DatanodeConfiguration {
     this.failedMetadataVolumesTolerated = failedVolumesTolerated;
   }
 
+  public int getFailedDbVolumesTolerated() {
+    return failedDbVolumesTolerated;
+  }
+
+  public void setFailedDbVolumesTolerated(int failedVolumesTolerated) {
+    this.failedDbVolumesTolerated = failedVolumesTolerated;
+  }
+
   public Duration getDiskCheckMinGap() {
     return Duration.ofMillis(diskCheckMinGap);
   }
@@ -372,4 +413,12 @@ public class DatanodeConfiguration {
   public int getNumReadThreadPerVolume() {
     return numReadThreadPerVolume;
   }
+
+  public boolean getContainerSchemaV3Enabled() {
+    return this.containerSchemaV3Enabled;
+  }
+
+  public void setContainerSchemaV3Enabled(boolean containerSchemaV3Enabled) {
+    this.containerSchemaV3Enabled = containerSchemaV3Enabled;
+  }
 }
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
index ae3c3a94a2..df39f2aad6 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
@@ -316,8 +316,8 @@ public class DatanodeStateMachine implements Closeable {
   public void handleFatalVolumeFailures() {
     LOG.error("DatanodeStateMachine Shutdown due to too many bad volumes, "
         + "check " + DatanodeConfiguration.FAILED_DATA_VOLUMES_TOLERATED_KEY
-        + " and "
-        + DatanodeConfiguration.FAILED_METADATA_VOLUMES_TOLERATED_KEY);
+        + " and " + DatanodeConfiguration.FAILED_METADATA_VOLUMES_TOLERATED_KEY
+        + " and " + DatanodeConfiguration.FAILED_DB_VOLUMES_TOLERATED_KEY);
     hddsDatanodeStopService.stopService();
   }
 
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/VersionEndpointTask.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/VersionEndpointTask.java
index d80d1e5bca..9e0669c8e2 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/VersionEndpointTask.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/VersionEndpointTask.java
@@ -17,18 +17,17 @@
 package org.apache.hadoop.ozone.container.common.states.endpoint;
 
 import java.io.IOException;
-import java.util.Map;
 import java.util.concurrent.Callable;
 
 import org.apache.hadoop.hdds.conf.ConfigurationSource;
 import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMVersionResponseProto;
 import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.ozone.container.common.statemachine.EndpointStateMachine;
-import org.apache.hadoop.ozone.container.common.utils.HddsVolumeUtil;
-import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
+import org.apache.hadoop.ozone.container.common.utils.StorageVolumeUtil;
 import org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet;
 import org.apache.hadoop.ozone.container.common.volume.StorageVolume;
 import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
+import org.apache.hadoop.ozone.container.upgrade.VersionedDatanodeFeatures.SchemaV3;
 import org.apache.hadoop.ozone.protocol.VersionResponse;
 import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
 
@@ -78,36 +77,17 @@ public class VersionEndpointTask implements
           String scmId = response.getValue(OzoneConsts.SCM_ID);
           String clusterId = response.getValue(OzoneConsts.CLUSTER_ID);
 
-          // Check volumes
-          MutableVolumeSet volumeSet = ozoneContainer.getVolumeSet();
-          volumeSet.writeLock();
-          try {
-            Map<String, StorageVolume> volumeMap = volumeSet.getVolumeMap();
+          Preconditions.checkNotNull(scmId,
+              "Reply from SCM: scmId cannot be null");
+          Preconditions.checkNotNull(clusterId,
+              "Reply from SCM: clusterId cannot be null");
 
-            Preconditions.checkNotNull(scmId,
-                "Reply from SCM: scmId cannot be null");
-            Preconditions.checkNotNull(clusterId,
-                "Reply from SCM: clusterId cannot be null");
-
-            // If version file does not exist
-            // create version file and also set scm ID or cluster ID.
-            for (Map.Entry<String, StorageVolume> entry
-                : volumeMap.entrySet()) {
-              StorageVolume volume = entry.getValue();
-              boolean result = HddsVolumeUtil.checkVolume((HddsVolume) volume,
-                  scmId, clusterId, configuration, LOG);
-              if (!result) {
-                volumeSet.failVolume(volume.getStorageDir().getPath());
-              }
-            }
-            if (volumeSet.getVolumesList().size() == 0) {
-              // All volumes are in inconsistent state
-              throw new DiskOutOfSpaceException(
-                  "All configured Volumes are in Inconsistent State");
-            }
-          } finally {
-            volumeSet.writeUnlock();
+          // Check DbVolumes
+          if (SchemaV3.isFinalizedAndEnabled(configuration)) {
+            checkVolumeSet(ozoneContainer.getDbVolumeSet(), scmId, clusterId);
           }
+          // Check HddsVolumes
+          checkVolumeSet(ozoneContainer.getVolumeSet(), scmId, clusterId);
 
           // Start the container services after getting the version information
           ozoneContainer.start(clusterId);
@@ -129,4 +109,32 @@ public class VersionEndpointTask implements
     }
     return rpcEndPoint.getState();
   }
+
+  private void checkVolumeSet(MutableVolumeSet volumeSet,
+      String scmId, String clusterId) throws DiskOutOfSpaceException {
+    if (volumeSet == null) {
+      return;
+    }
+
+    volumeSet.writeLock();
+    try {
+      // If version file does not exist
+      // create version file and also set scm ID or cluster ID.
+      for (StorageVolume volume : volumeSet.getVolumeMap().values()) {
+        boolean result = StorageVolumeUtil.checkVolume(volume,
+            scmId, clusterId, configuration, LOG,
+            ozoneContainer.getDbVolumeSet());
+        if (!result) {
+          volumeSet.failVolume(volume.getStorageDir().getPath());
+        }
+      }
+      if (volumeSet.getVolumesList().size() == 0) {
+        // All volumes are in inconsistent state
+        throw new DiskOutOfSpaceException(
+            "All configured Volumes are in Inconsistent State");
+      }
+    } finally {
+      volumeSet.writeUnlock();
+    }
+  }
 }
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/utils/DatanodeStoreCache.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/utils/DatanodeStoreCache.java
index 646fc2a2f3..0f7baa6317 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/utils/DatanodeStoreCache.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/utils/DatanodeStoreCache.java
@@ -58,7 +58,17 @@ public final class DatanodeStoreCache {
   }
 
   public void removeDB(String containerDBPath) {
-    datanodeStoreMap.remove(containerDBPath);
+    RawDB db = datanodeStoreMap.remove(containerDBPath);
+    if (db == null) {
+      LOG.debug("DB {} already removed", containerDBPath);
+      return;
+    }
+
+    try {
+      db.getStore().stop();
+    } catch (Exception e) {
+      LOG.error("Stop DatanodeStore: {} failed", containerDBPath, e);
+    }
   }
 
   public void shutdownCache() {
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/utils/HddsVolumeUtil.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/utils/HddsVolumeUtil.java
index 6a38080214..0e1414f998 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/utils/HddsVolumeUtil.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/utils/HddsVolumeUtil.java
@@ -18,21 +18,22 @@
 
 package org.apache.hadoop.ozone.container.common.utils;
 
-import com.google.common.annotations.VisibleForTesting;
-import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.hdds.conf.ConfigurationSource;
 import org.apache.hadoop.ozone.OzoneConsts;
-import org.apache.hadoop.ozone.common.InconsistentStorageStateException;
-import org.apache.hadoop.ozone.container.common.HDDSVolumeLayoutVersion;
+import org.apache.hadoop.ozone.container.common.volume.DbVolume;
 import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
-import org.apache.hadoop.ozone.container.upgrade.VersionedDatanodeFeatures;
-import org.apache.hadoop.util.Time;
+import org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet;
+import org.apache.hadoop.ozone.container.keyvalue.helpers.BlockUtils;
+import org.apache.hadoop.ozone.container.metadata.DatanodeStore;
 import org.slf4j.Logger;
 
 import java.io.File;
 import java.io.IOException;
-import java.util.Properties;
-import java.util.UUID;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.hadoop.ozone.container.common.utils.StorageVolumeUtil.onFailure;
 
 /**
  * A util class for {@link HddsVolume}.
@@ -43,17 +44,6 @@ public final class HddsVolumeUtil {
   private HddsVolumeUtil() {
   }
 
-  private static final String VERSION_FILE   = "VERSION";
-  private static final String STORAGE_ID_PREFIX = "DS-";
-
-  public static File getVersionFile(File rootDir) {
-    return new File(rootDir, VERSION_FILE);
-  }
-
-  public static String generateUuid() {
-    return STORAGE_ID_PREFIX + UUID.randomUUID();
-  }
-
   /**
    * Get hddsRoot from volume root. If volumeRoot points to hddsRoot, it is
    * returned as is.
@@ -71,167 +61,65 @@ public final class HddsVolumeUtil {
   }
 
   /**
-   * Returns storageID if it is valid. Throws an exception otherwise.
-   */
-  @VisibleForTesting
-  public static String getStorageID(Properties props, File versionFile)
-      throws InconsistentStorageStateException {
-    return getProperty(props, OzoneConsts.STORAGE_ID, versionFile);
-  }
-
-  /**
-   * Returns clusterID if it is valid. It should match the clusterID from the
-   * Datanode. Throws an exception otherwise.
-   */
-  @VisibleForTesting
-  public static String getClusterID(Properties props, File versionFile,
-      String clusterID) throws InconsistentStorageStateException {
-    String cid = getProperty(props, OzoneConsts.CLUSTER_ID, versionFile);
-
-    if (clusterID == null) {
-      return cid;
-    }
-    if (!clusterID.equals(cid)) {
-      throw new InconsistentStorageStateException("Mismatched " +
-          "ClusterIDs. Version File : " + versionFile + " has clusterID: " +
-          cid + " and Datanode has clusterID: " + clusterID);
-    }
-    return cid;
-  }
-
-  /**
-   * Returns datanodeUuid if it is valid. It should match the UUID of the
-   * Datanode. Throws an exception otherwise.
-   */
-  @VisibleForTesting
-  public static String getDatanodeUUID(Properties props, File versionFile,
-      String datanodeUuid)
-      throws InconsistentStorageStateException {
-    String datanodeID = getProperty(props, OzoneConsts.DATANODE_UUID,
-        versionFile);
-
-    if (datanodeUuid != null && !datanodeUuid.equals(datanodeID)) {
-      throw new InconsistentStorageStateException("Mismatched " +
-          "DatanodeUUIDs. Version File : " + versionFile + " has datanodeUuid: "
-          + datanodeID + " and Datanode has datanodeUuid: " + datanodeUuid);
-    }
-    return datanodeID;
-  }
-
-  /**
-   * Returns creationTime if it is valid. Throws an exception otherwise.
+   * Initialize db instance, rocksdb will load the existing instance
+   * if present and format a new one if not.
+   * @param containerDBPath
+   * @param conf
+   * @throws IOException
    */
-  @VisibleForTesting
-  public static long getCreationTime(Properties props, File versionFile)
-      throws InconsistentStorageStateException {
-    String cTimeStr = getProperty(props, OzoneConsts.CTIME, versionFile);
-
-    long cTime = Long.parseLong(cTimeStr);
-    long currentTime = Time.now();
-    if (cTime > currentTime || cTime < 0) {
-      throw new InconsistentStorageStateException("Invalid Creation time in " +
-          "Version File : " + versionFile + " - " + cTime + ". Current system" +
-          " time is " + currentTime);
-    }
-    return cTime;
+  public static void initPerDiskDBStore(String containerDBPath,
+      ConfigurationSource conf) throws IOException {
+    DatanodeStore store = BlockUtils.getUncachedDatanodeStore(containerDBPath,
+        OzoneConsts.SCHEMA_V3, conf, false);
+    BlockUtils.addDB(store, containerDBPath, conf, OzoneConsts.SCHEMA_V3);
   }
 
   /**
-   * Returns layOutVersion if it is valid. Throws an exception otherwise.
+   * Load already formatted db instances for all HddsVolumes.
+   * @param hddsVolumeSet
+   * @param dbVolumeSet
+   * @param logger
    */
-  @VisibleForTesting
-  public static int getLayOutVersion(Properties props, File versionFile) throws
-      InconsistentStorageStateException {
-    String lvStr = getProperty(props, OzoneConsts.LAYOUTVERSION, versionFile);
-
-    int lv = Integer.parseInt(lvStr);
-    if (HDDSVolumeLayoutVersion.getLatestVersion().getVersion() != lv) {
-      throw new InconsistentStorageStateException("Invalid layOutVersion. " +
-          "Version file has layOutVersion as " + lv + " and latest Datanode " +
-          "layOutVersion is " +
-          HDDSVolumeLayoutVersion.getLatestVersion().getVersion());
-    }
-    return lv;
-  }
-
-  public static String getProperty(
-      Properties props, String propName, File
-      versionFile
-  )
-      throws InconsistentStorageStateException {
-    String value = props.getProperty(propName);
-    if (StringUtils.isBlank(value)) {
-      throw new InconsistentStorageStateException("Invalid " + propName +
-          ". Version File : " + versionFile + " has null or empty " + propName);
+  public static void loadAllHddsVolumeDbStore(MutableVolumeSet hddsVolumeSet,
+      MutableVolumeSet dbVolumeSet, Logger logger) {
+    // Scan subdirs under the db volumes and build a one-to-one map
+    // between each HddsVolume -> DbVolume.
+    mapDbVolumesToDataVolumesIfNeeded(hddsVolumeSet, dbVolumeSet);
+
+    for (HddsVolume volume : StorageVolumeUtil.getHddsVolumesList(
+        hddsVolumeSet.getVolumesList())) {
+      try {
+        volume.loadDbStore();
+      } catch (IOException e) {
+        onFailure(volume);
+        if (logger != null) {
+          logger.error("Load db store for HddsVolume {} failed",
+              volume.getStorageDir().getAbsolutePath(), e);
+        }
+      }
     }
-    return value;
   }
 
-  /**
-   * Check Volume is in consistent state or not.
-   * Prior to SCM HA, volumes used the format {@code <volume>/hdds/<scm-id>}.
-   * Post SCM HA, new volumes will use the format {@code <volume>/hdds/<cluster
-   * -id>}.
-   * Existing volumes using SCM ID would have been reformatted to have {@code
-   * <volume>/hdds/<cluster-id>} as a symlink pointing to {@code <volume
-   * >/hdds/<scm-id>}.
-   *
-   * @param hddsVolume
-   * @param clusterId
-   * @param logger
-   * @return true - if volume is in consistent state, otherwise false.
-   */
-  public static boolean checkVolume(HddsVolume hddsVolume, String scmId,
-      String clusterId, ConfigurationSource conf, Logger logger) {
-    File hddsRoot = hddsVolume.getHddsRootDir();
-    String volumeRoot = hddsRoot.getPath();
-    File clusterDir = new File(hddsRoot, clusterId);
-
-    try {
-      hddsVolume.format(clusterId);
-    } catch (IOException ex) {
-      logger.error("Error during formatting volume {}.",
-          volumeRoot, ex);
-      return false;
+  private static void mapDbVolumesToDataVolumesIfNeeded(
+      MutableVolumeSet hddsVolumeSet, MutableVolumeSet dbVolumeSet) {
+    if (dbVolumeSet == null || dbVolumeSet.getVolumesList().isEmpty()) {
+      return;
     }
 
-    File[] hddsFiles = hddsRoot.listFiles();
-
-    if (hddsFiles == null) {
-      // This is the case for IOException, where listFiles returns null.
-      // So, we fail the volume.
-      return false;
-    } else if (hddsFiles.length == 1) {
-      // DN started for first time or this is a newly added volume.
-      // The one file is the version file.
-      // So we create cluster ID directory, or SCM ID directory if
-      // pre-finalized for SCM HA.
-      // Either the SCM ID or cluster ID will be used in naming the
-      // volume's subdirectory, depending on the datanode's layout version.
-      String id = VersionedDatanodeFeatures.ScmHA.chooseContainerPathID(conf,
-          scmId, clusterId);
-      File idDir = new File(hddsRoot, id);
-      if (!idDir.mkdir()) {
-        logger.error("Unable to create ID directory {} for datanode.", idDir);
-        return false;
-      }
-      return true;
-    } else if (hddsFiles.length == 2) {
-      // If we are finalized for SCM HA and there is no cluster ID directory,
-      // the volume may have been unhealthy during finalization and been
-      // skipped. Create cluster ID symlink now.
-      // Else, We are still pre-finalized.
-      // The existing directory should be left for backwards compatibility.
-      return VersionedDatanodeFeatures.ScmHA.
-          upgradeVolumeIfNeeded(hddsVolume, clusterId);
-    } else {
-      if (!clusterDir.exists()) {
-        logger.error("Volume {} is in an inconsistent state. {} files found " +
-            "but cluster ID directory {} does not exist.", volumeRoot,
-            hddsFiles.length, clusterDir);
-        return false;
-      }
-      return true;
-    }
+    List<HddsVolume> hddsVolumes = StorageVolumeUtil.getHddsVolumesList(
+        hddsVolumeSet.getVolumesList());
+    List<DbVolume> dbVolumes = StorageVolumeUtil.getDbVolumesList(
+        dbVolumeSet.getVolumesList());
+    Map<String, DbVolume> globalDbVolumeMap = new HashMap<>();
+
+    // build a datanode global map of storageID -> dbVolume
+    dbVolumes.forEach(dbVolume ->
+        dbVolume.getHddsVolumeIDs().forEach(storageID ->
+            globalDbVolumeMap.put(storageID, dbVolume)));
+
+    // map each hddsVolume to a dbVolume
+    hddsVolumes.forEach(hddsVolume ->
+        hddsVolume.setDbVolume(globalDbVolumeMap.getOrDefault(
+            hddsVolume.getStorageID(), null)));
   }
 }
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/utils/StorageVolumeUtil.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/utils/StorageVolumeUtil.java
index 104dbac78b..0050038bb8 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/utils/StorageVolumeUtil.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/utils/StorageVolumeUtil.java
@@ -18,12 +18,26 @@
 
 package org.apache.hadoop.ozone.container.common.utils;
 
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.hdds.conf.ConfigurationSource;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.common.InconsistentStorageStateException;
+import org.apache.hadoop.ozone.container.common.HDDSVolumeLayoutVersion;
+import org.apache.hadoop.ozone.container.common.volume.DbVolume;
 import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
 import org.apache.hadoop.ozone.container.common.volume.StorageVolume;
 import org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet;
 import org.apache.hadoop.ozone.container.common.volume.VolumeSet;
+import org.apache.hadoop.ozone.container.upgrade.VersionedDatanodeFeatures;
+import org.apache.hadoop.util.Time;
+import org.slf4j.Logger;
 
+import java.io.File;
+import java.io.IOException;
 import java.util.List;
+import java.util.Properties;
+import java.util.UUID;
 import java.util.stream.Collectors;
 
 /**
@@ -31,6 +45,9 @@ import java.util.stream.Collectors;
  */
 public final class StorageVolumeUtil {
 
+  private static final String VERSION_FILE   = "VERSION";
+  private static final String STORAGE_ID_PREFIX = "DS-";
+
   private StorageVolumeUtil() {
   }
 
@@ -48,4 +65,189 @@ public final class StorageVolumeUtil {
     return volumes.stream().
         map(v -> (HddsVolume) v).collect(Collectors.toList());
   }
+
+  public static List<DbVolume> getDbVolumesList(
+      List<StorageVolume> volumes) {
+    return volumes.stream().
+        map(v -> (DbVolume) v).collect(Collectors.toList());
+  }
+
+  public static File getVersionFile(File rootDir) {
+    return new File(rootDir, VERSION_FILE);
+  }
+
+  public static String generateUuid() {
+    return STORAGE_ID_PREFIX + UUID.randomUUID();
+  }
+
+  /**
+   * Returns storageID if it is valid. Throws an exception otherwise.
+   */
+  @VisibleForTesting
+  public static String getStorageID(Properties props, File versionFile)
+      throws InconsistentStorageStateException {
+    return getProperty(props, OzoneConsts.STORAGE_ID, versionFile);
+  }
+
+  /**
+   * Returns clusterID if it is valid. It should match the clusterID from the
+   * Datanode. Throws an exception otherwise.
+   */
+  @VisibleForTesting
+  public static String getClusterID(Properties props, File versionFile,
+      String clusterID) throws InconsistentStorageStateException {
+    String cid = getProperty(props, OzoneConsts.CLUSTER_ID, versionFile);
+
+    if (clusterID == null) {
+      return cid;
+    }
+    if (!clusterID.equals(cid)) {
+      throw new InconsistentStorageStateException("Mismatched " +
+          "ClusterIDs. Version File : " + versionFile + " has clusterID: " +
+          cid + " and Datanode has clusterID: " + clusterID);
+    }
+    return cid;
+  }
+
+  /**
+   * Returns datanodeUuid if it is valid. It should match the UUID of the
+   * Datanode. Throws an exception otherwise.
+   */
+  @VisibleForTesting
+  public static String getDatanodeUUID(Properties props, File versionFile,
+      String datanodeUuid)
+      throws InconsistentStorageStateException {
+    String datanodeID = getProperty(props, OzoneConsts.DATANODE_UUID,
+        versionFile);
+
+    if (datanodeUuid != null && !datanodeUuid.equals(datanodeID)) {
+      throw new InconsistentStorageStateException("Mismatched " +
+          "DatanodeUUIDs. Version File : " + versionFile + " has datanodeUuid: "
+          + datanodeID + " and Datanode has datanodeUuid: " + datanodeUuid);
+    }
+    return datanodeID;
+  }
+
+  /**
+   * Returns creationTime if it is valid. Throws an exception otherwise.
+   */
+  @VisibleForTesting
+  public static long getCreationTime(Properties props, File versionFile)
+      throws InconsistentStorageStateException {
+    String cTimeStr = getProperty(props, OzoneConsts.CTIME, versionFile);
+
+    long cTime = Long.parseLong(cTimeStr);
+    long currentTime = Time.now();
+    if (cTime > currentTime || cTime < 0) {
+      throw new InconsistentStorageStateException("Invalid Creation time in " +
+          "Version File : " + versionFile + " - " + cTime + ". Current system" +
+          " time is " + currentTime);
+    }
+    return cTime;
+  }
+
+  /**
+   * Returns layOutVersion if it is valid. Throws an exception otherwise.
+   */
+  @VisibleForTesting
+  public static int getLayOutVersion(Properties props, File versionFile) throws
+      InconsistentStorageStateException {
+    String lvStr = getProperty(props, OzoneConsts.LAYOUTVERSION, versionFile);
+
+    int lv = Integer.parseInt(lvStr);
+    if (HDDSVolumeLayoutVersion.getLatestVersion().getVersion() != lv) {
+      throw new InconsistentStorageStateException("Invalid layOutVersion. " +
+          "Version file has layOutVersion as " + lv + " and latest Datanode " +
+          "layOutVersion is " +
+          HDDSVolumeLayoutVersion.getLatestVersion().getVersion());
+    }
+    return lv;
+  }
+
+  public static String getProperty(
+      Properties props, String propName, File
+      versionFile
+  )
+      throws InconsistentStorageStateException {
+    String value = props.getProperty(propName);
+    if (StringUtils.isBlank(value)) {
+      throw new InconsistentStorageStateException("Invalid " + propName +
+          ". Version File : " + versionFile + " has null or empty " + propName);
+    }
+    return value;
+  }
+
+  /**
+   * Check Volume is in consistent state or not.
+   * Prior to SCM HA, volumes used the format {@code <volume>/hdds/<scm-id>}.
+   * Post SCM HA, new volumes will use the format {@code <volume>/hdds/<cluster
+   * -id>}.
+   * Existing volumes using SCM ID would have been reformatted to have {@code
+   * <volume>/hdds/<cluster-id>} as a symlink pointing to {@code <volume
+   * >/hdds/<scm-id>}.
+   *
+   * @param volume
+   * @param scmId
+   * @param clusterId
+   * @param conf
+   * @param logger
+   * @param dbVolumeSet
+   * @return true - if volume is in consistent state, otherwise false.
+   */
+  public static boolean checkVolume(StorageVolume volume, String scmId,
+      String clusterId, ConfigurationSource conf, Logger logger,
+      MutableVolumeSet dbVolumeSet) {
+    File volumeRoot = volume.getStorageDir();
+    String volumeRootPath = volumeRoot.getPath();
+    File clusterDir = new File(volumeRoot, clusterId);
+
+    try {
+      volume.format(clusterId);
+    } catch (IOException ex) {
+      logger.error("Error during formatting volume {}.",
+          volumeRootPath, ex);
+      return false;
+    }
+
+    File[] rootFiles = volumeRoot.listFiles();
+
+    if (rootFiles == null) {
+      // This is the case for IOException, where listFiles returns null.
+      // So, we fail the volume.
+      return false;
+    } else if (rootFiles.length == 1) {
+      // DN started for first time or this is a newly added volume.
+      // The one file is the version file.
+      // So we create cluster ID directory, or SCM ID directory if
+      // pre-finalized for SCM HA.
+      // Either the SCM ID or cluster ID will be used in naming the
+      // volume's subdirectory, depending on the datanode's layout version.
+      String id = VersionedDatanodeFeatures.ScmHA.chooseContainerPathID(conf,
+          scmId, clusterId);
+      try {
+        volume.createWorkingDir(id, dbVolumeSet);
+      } catch (IOException e) {
+        logger.error("Prepare working dir failed for volume {}.",
+            volumeRootPath, e);
+        return false;
+      }
+      return true;
+    } else if (rootFiles.length == 2) {
+      // If we are finalized for SCM HA and there is no cluster ID directory,
+      // the volume may have been unhealthy during finalization and been
+      // skipped. Create cluster ID symlink now.
+      // Else, We are still pre-finalized.
+      // The existing directory should be left for backwards compatibility.
+      return VersionedDatanodeFeatures.ScmHA.
+          upgradeVolumeIfNeeded(volume, clusterId);
+    } else {
+      if (!clusterDir.exists()) {
+        logger.error("Volume {} is in an inconsistent state. {} files found " +
+            "but cluster ID directory {} does not exist.", volumeRootPath,
+            rootFiles.length, clusterDir);
+        return false;
+      }
+      return true;
+    }
+  }
 }
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/DbVolume.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/DbVolume.java
new file mode 100644
index 0000000000..bd593d38da
--- /dev/null
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/DbVolume.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.container.common.volume;
+
+import org.apache.hadoop.ozone.container.common.utils.DatanodeStoreCache;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.hadoop.ozone.OzoneConsts.CONTAINER_DB_NAME;
+
+/**
+ * DbVolume represents a volume in datanode holding db instances
+ * for multiple HddsVolumes. One HddsVolume will have one subdirectory
+ * for its db instance under a DbVolume.
+ *
+ * For example:
+ *   Say we have an SSD device mounted at /ssd1, then the DbVolume
+ *   root directory is /ssd1/db, and we have a subdirectory
+ *   for db instance like
+ *   /ssd1/db/<clusterID>/<storageID>/container.db.
+ */
+public class DbVolume extends StorageVolume {
+
+  private static final Logger LOG = LoggerFactory.getLogger(DbVolume.class);
+
+  public static final String DB_VOLUME_DIR = "db";
+
+  /**
+   * Records all HddsVolumes that put its db instance under this DbVolume.
+   * Map: HddsVolume.StorageID -> DBStorePath
+   */
+  private final Map<String, String> hddsDbStorePathMap;
+
+  protected DbVolume(Builder b) throws IOException {
+    super(b);
+
+    this.hddsDbStorePathMap = new HashMap<>();
+    if (!b.getFailedVolume()) {
+      LOG.info("Creating DbVolume: {} of storage type : {} capacity : {}",
+          getStorageDir(), b.getStorageType(), getVolumeInfo().getCapacity());
+      initialize();
+    }
+  }
+
+  @Override
+  protected void initialize() throws IOException {
+    super.initialize();
+    scanForDbStorePaths();
+  }
+
+  @Override
+  public void failVolume() {
+    super.failVolume();
+    closeAllDbStore();
+  }
+
+  @Override
+  public void shutdown() {
+    super.shutdown();
+    closeAllDbStore();
+  }
+
+  public void addHddsDbStorePath(String id, String dbPath) {
+    hddsDbStorePathMap.put(id, dbPath);
+  }
+
+  public Set<String> getHddsVolumeIDs() {
+    return hddsDbStorePathMap.keySet();
+  }
+
+  /**
+   * Builder class for DbVolume.
+   */
+  public static class Builder extends StorageVolume.Builder<Builder> {
+
+    public Builder(String volumeRootStr) {
+      super(volumeRootStr, DB_VOLUME_DIR);
+    }
+
+    @Override
+    public Builder getThis() {
+      return this;
+    }
+
+    public DbVolume build() throws IOException {
+      return new DbVolume(this);
+    }
+  }
+
+  private void scanForDbStorePaths() throws IOException {
+    // Not formatted yet
+    if (!getStorageState().equals(VolumeState.NORMAL)) {
+      return;
+    }
+
+    // scan subdirectories for db instances mapped to HddsVolumes
+    File clusterIdDir = new File(getStorageDir(), getClusterID());
+    // Working dir not prepared yet
+    if (!clusterIdDir.exists()) {
+      return;
+    }
+
+    File[] subdirs = clusterIdDir.listFiles(File::isDirectory);
+    if (subdirs == null) {
+      throw new IOException("Failed to do listFiles for " +
+          clusterIdDir.getAbsolutePath());
+    }
+    hddsDbStorePathMap.clear();
+
+    for (File subdir : subdirs) {
+      String storageID = subdir.getName();
+      File storageIdDir = new File(clusterIdDir, subdir.getName());
+      hddsDbStorePathMap.put(storageID, new File(storageIdDir,
+          CONTAINER_DB_NAME).getAbsolutePath());
+    }
+  }
+
+  private void closeAllDbStore() {
+    // Here we check clusterID directly, because the state
+    // may not be NORMAL, it could be FAILED.
+    if (getClusterID() == null) {
+      return;
+    }
+
+    File clusterIdDir = new File(getStorageDir(), getClusterID());
+    if (clusterIdDir.exists()) {
+      for (String containerDBPath : hddsDbStorePathMap.values()) {
+        DatanodeStoreCache.getInstance().removeDB(containerDBPath);
+      }
+    }
+  }
+}
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/MetadataVolumeFactory.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/DbVolumeFactory.java
similarity index 64%
copy from hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/MetadataVolumeFactory.java
copy to hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/DbVolumeFactory.java
index b83cb3883c..9aa4cefcdf 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/MetadataVolumeFactory.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/DbVolumeFactory.java
@@ -15,7 +15,6 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.hadoop.ozone.container.common.volume;
 
 import org.apache.hadoop.fs.StorageType;
@@ -25,31 +24,37 @@ import org.apache.hadoop.hdds.fs.SpaceUsageCheckFactory;
 import java.io.IOException;
 
 /**
- * A factory class for MetadataVolume.
+ * A factory class for DbVolume.
  */
-public class MetadataVolumeFactory extends StorageVolumeFactory {
+public class DbVolumeFactory extends StorageVolumeFactory {
 
-  public MetadataVolumeFactory(ConfigurationSource conf,
-      SpaceUsageCheckFactory usageCheckFactory, MutableVolumeSet volumeSet) {
-    super(conf, usageCheckFactory, volumeSet);
+  public DbVolumeFactory(ConfigurationSource conf,
+      SpaceUsageCheckFactory usageCheckFactory, MutableVolumeSet volumeSet,
+      String datanodeUuid, String clusterID) {
+    super(conf, usageCheckFactory, volumeSet, datanodeUuid, clusterID);
   }
 
   @Override
   StorageVolume createVolume(String locationString, StorageType storageType)
       throws IOException {
-    MetadataVolume.Builder volumeBuilder =
-        new MetadataVolume.Builder(locationString)
-            .conf(getConf())
-            .usageCheckFactory(getUsageCheckFactory())
-            .storageType(storageType)
-            .volumeSet(getVolumeSet());
-    return volumeBuilder.build();
+    DbVolume.Builder volumeBuilder = new DbVolume.Builder(locationString)
+        .conf(getConf())
+        .datanodeUuid(getDatanodeUuid())
+        .clusterID(getClusterID())
+        .usageCheckFactory(getUsageCheckFactory())
+        .storageType(storageType)
+        .volumeSet(getVolumeSet());
+    DbVolume volume = volumeBuilder.build();
+
+    checkAndSetClusterID(volume.getClusterID());
+
+    return volume;
   }
 
   @Override
   StorageVolume createFailedVolume(String locationString) throws IOException {
-    MetadataVolume.Builder volumeBuilder =
-        new MetadataVolume.Builder(locationString)
+    DbVolume.Builder volumeBuilder =
+        new DbVolume.Builder(locationString)
             .failedVolume(true);
     return volumeBuilder.build();
   }
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/HddsVolume.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/HddsVolume.java
index a1e41ff9d7..513882eb9e 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/HddsVolume.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/HddsVolume.java
@@ -18,25 +18,24 @@
 
 package org.apache.hadoop.ozone.container.common.volume;
 
-import static org.apache.hadoop.ozone.container.common.HDDSVolumeLayoutVersion.getLatestVersion;
-
 import java.io.File;
 import java.io.IOException;
-import java.util.Properties;
-import java.util.UUID;
+import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.hadoop.hdds.annotation.InterfaceAudience;
 import org.apache.hadoop.hdds.annotation.InterfaceStability;
-import org.apache.hadoop.ozone.common.InconsistentStorageStateException;
-import org.apache.hadoop.ozone.container.common.helpers.DatanodeVersionFile;
-import org.apache.hadoop.ozone.container.common.utils.HddsVolumeUtil;
-import org.apache.hadoop.util.Time;
 
-import com.google.common.base.Preconditions;
+import org.apache.hadoop.ozone.container.common.utils.DatanodeStoreCache;
+import org.apache.hadoop.ozone.container.common.utils.StorageVolumeUtil;
+import org.apache.hadoop.ozone.container.upgrade.VersionedDatanodeFeatures.SchemaV3;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static org.apache.hadoop.ozone.OzoneConsts.CONTAINER_DB_NAME;
+import static org.apache.hadoop.ozone.container.common.utils.HddsVolumeUtil.initPerDiskDBStore;
+
 /**
  * HddsVolume represents volume in a datanode. {@link MutableVolumeSet}
  * maintains a list of HddsVolumes, one for each volume in the Datanode.
@@ -49,12 +48,6 @@ import org.slf4j.LoggerFactory;
  * <p>{@literal ../hdds/<<clusterUuid>>/current/<<containerDir>>/<<containerID
  * >>/<<dataDir>>}
  * <p>
- * Each hdds volume has its own VERSION file. The hdds volume will have one
- * clusterUuid directory for each SCM it is a part of (currently only one SCM is
- * supported).
- *
- * During DN startup, if the VERSION file exists, we verify that the
- * clusterID in the version file matches the clusterID from SCM.
  */
 @InterfaceAudience.Private
 @InterfaceStability.Unstable
@@ -65,23 +58,22 @@ public class HddsVolume extends StorageVolume {
 
   public static final String HDDS_VOLUME_DIR = "hdds";
 
-  private VolumeState state;
   private final VolumeIOStats volumeIOStats;
 
-  // VERSION file properties
-  private String storageID;       // id of the file system
-  private String clusterID;       // id of the cluster
-  private String datanodeUuid;    // id of the DataNode
-  private long cTime;             // creation time of the file system state
-  private int layoutVersion;      // layout version of the storage data
   private final AtomicLong committedBytes; // till Open containers become full
 
+  // The dedicated DbVolume that the db instance of this HddsVolume resides.
+  // This is optional, if null then the db instance resides on this HddsVolume.
+  private DbVolume dbVolume;
+  // The subdirectory with storageID as its name, used to build the
+  // container db path. This is initialized only once together with dbVolume,
+  // and stored as a member to prevent spawning lots of File objects.
+  private File dbParentDir;
+
   /**
    * Builder for HddsVolume.
    */
   public static class Builder extends StorageVolume.Builder<Builder> {
-    private String datanodeUuid;
-    private String clusterID;
 
     public Builder(String volumeRootStr) {
       super(volumeRootStr, HDDS_VOLUME_DIR);
@@ -92,16 +84,6 @@ public class HddsVolume extends StorageVolume {
       return this;
     }
 
-    public Builder datanodeUuid(String datanodeUUID) {
-      this.datanodeUuid = datanodeUUID;
-      return this;
-    }
-
-    public Builder clusterID(String cid) {
-      this.clusterID = cid;
-      return this;
-    }
-
     public HddsVolume build() throws IOException {
       return new HddsVolume(this);
     }
@@ -111,9 +93,6 @@ public class HddsVolume extends StorageVolume {
     super(b);
 
     if (!b.getFailedVolume()) {
-      this.state = VolumeState.NOT_INITIALIZED;
-      this.clusterID = b.clusterID;
-      this.datanodeUuid = b.datanodeUuid;
       this.volumeIOStats = new VolumeIOStats(b.getVolumeRootStr());
       this.committedBytes = new AtomicLong(0);
 
@@ -125,234 +104,49 @@ public class HddsVolume extends StorageVolume {
       // Builder is called with failedVolume set, so create a failed volume
       // HddsVolume Object.
       volumeIOStats = null;
-      storageID = UUID.randomUUID().toString();
-      state = VolumeState.FAILED;
       committedBytes = null;
     }
 
   }
 
-  /**
-   * Initializes the volume.
-   * Creates the Version file if not present,
-   * otherwise returns with IOException.
-   * @throws IOException
-   */
-  private void initialize() throws IOException {
-    VolumeState intialVolumeState = analyzeVolumeState();
-    switch (intialVolumeState) {
-    case NON_EXISTENT:
-      // Root directory does not exist. Create it.
-      if (!getStorageDir().mkdirs()) {
-        throw new IOException("Cannot create directory " + getStorageDir());
-      }
-      setState(VolumeState.NOT_FORMATTED);
-      createVersionFile();
-      break;
-    case NOT_FORMATTED:
-      // Version File does not exist. Create it.
-      createVersionFile();
-      break;
-    case NOT_INITIALIZED:
-      // Version File exists. Verify its correctness and update property fields.
-      readVersionFile();
-      setState(VolumeState.NORMAL);
-      break;
-    case INCONSISTENT:
-      // Volume Root is in an inconsistent state. Skip loading this volume.
-      throw new IOException("Volume is in an " + VolumeState.INCONSISTENT +
-          " state. Skipped loading volume: " + getStorageDir().getPath());
-    default:
-      throw new IOException("Unrecognized initial state : " +
-          intialVolumeState + "of volume : " + getStorageDir());
-    }
-  }
-
-  private VolumeState analyzeVolumeState() {
-    if (!getStorageDir().exists()) {
-      // Volume Root does not exist.
-      return VolumeState.NON_EXISTENT;
-    }
-    if (!getStorageDir().isDirectory()) {
-      // Volume Root exists but is not a directory.
-      LOG.warn("Volume {} exists but is not a directory,"
-          + " current volume state: {}.",
-          getStorageDir().getPath(), VolumeState.INCONSISTENT);
-      return VolumeState.INCONSISTENT;
-    }
-    File[] files = getStorageDir().listFiles();
-    if (files == null || files.length == 0) {
-      // Volume Root exists and is empty.
-      return VolumeState.NOT_FORMATTED;
-    }
-    if (!getVersionFile().exists()) {
-      // Volume Root is non empty but VERSION file does not exist.
-      LOG.warn("VERSION file does not exist in volume {},"
-          + " current volume state: {}.",
-          getStorageDir().getPath(), VolumeState.INCONSISTENT);
-      return VolumeState.INCONSISTENT;
-    }
-    // Volume Root and VERSION file exist.
-    return VolumeState.NOT_INITIALIZED;
-  }
-
-  public void format(String cid) throws IOException {
-    Preconditions.checkNotNull(cid, "clusterID cannot be null while " +
-        "formatting Volume");
-    this.clusterID = cid;
-    initialize();
-  }
-
-  /**
-   * Create Version File and write property fields into it.
-   * @throws IOException
-   */
-  private void createVersionFile() throws IOException {
-    this.storageID = HddsVolumeUtil.generateUuid();
-    this.cTime = Time.now();
-    this.layoutVersion = getLatestVersion().getVersion();
-
-    if (this.clusterID == null || datanodeUuid == null) {
-      // HddsDatanodeService does not have the cluster information yet. Wait
-      // for registration with SCM.
-      LOG.debug("ClusterID not available. Cannot format the volume {}",
-          getStorageDir().getPath());
-      setState(VolumeState.NOT_FORMATTED);
-    } else {
-      // Write the version file to disk.
-      writeVersionFile();
-      setState(VolumeState.NORMAL);
-    }
-  }
-
-  private void writeVersionFile() throws IOException {
-    Preconditions.checkNotNull(this.storageID,
-        "StorageID cannot be null in Version File");
-    Preconditions.checkNotNull(this.clusterID,
-        "ClusterID cannot be null in Version File");
-    Preconditions.checkNotNull(this.datanodeUuid,
-        "DatanodeUUID cannot be null in Version File");
-    Preconditions.checkArgument(this.cTime > 0,
-        "Creation Time should be positive");
-    Preconditions.checkArgument(this.layoutVersion ==
-            getLatestVersion().getVersion(),
-        "Version File should have the latest LayOutVersion");
-
-    File versionFile = getVersionFile();
-    LOG.debug("Writing Version file to disk, {}", versionFile);
-
-    DatanodeVersionFile dnVersionFile = new DatanodeVersionFile(this.storageID,
-        this.clusterID, this.datanodeUuid, this.cTime, this.layoutVersion);
-    dnVersionFile.createVersionFile(versionFile);
-  }
+  @Override
+  public void createWorkingDir(String workingDirName,
+      MutableVolumeSet dbVolumeSet) throws IOException {
+    super.createWorkingDir(workingDirName, dbVolumeSet);
 
-  /**
-   * Read Version File and update property fields.
-   * Get common storage fields.
-   * Should be overloaded if additional fields need to be read.
-   *
-   * @throws IOException on error
-   */
-  private void readVersionFile() throws IOException {
-    File versionFile = getVersionFile();
-    Properties props = DatanodeVersionFile.readFrom(versionFile);
-    if (props.isEmpty()) {
-      throw new InconsistentStorageStateException(
-          "Version file " + versionFile + " is missing");
+    if (SchemaV3.isFinalizedAndEnabled(getConf())) {
+      createDbStore(dbVolumeSet);
     }
-
-    LOG.debug("Reading Version file from disk, {}", versionFile);
-    this.storageID = HddsVolumeUtil.getStorageID(props, versionFile);
-    this.clusterID = HddsVolumeUtil.getClusterID(props, versionFile,
-        this.clusterID);
-    this.datanodeUuid = HddsVolumeUtil.getDatanodeUUID(props, versionFile,
-        this.datanodeUuid);
-    this.cTime = HddsVolumeUtil.getCreationTime(props, versionFile);
-    this.layoutVersion = HddsVolumeUtil.getLayOutVersion(props, versionFile);
-  }
-
-  private File getVersionFile() {
-    return HddsVolumeUtil.getVersionFile(super.getStorageDir());
   }
 
   public File getHddsRootDir() {
     return super.getStorageDir();
   }
 
-  @Override
-  public String getStorageID() {
-    return storageID;
-  }
-
-  public String getClusterID() {
-    return clusterID;
-  }
-
-  public String getDatanodeUuid() {
-    return datanodeUuid;
-  }
-
-  public long getCTime() {
-    return cTime;
-  }
-
-  public int getLayoutVersion() {
-    return layoutVersion;
-  }
-
-  public VolumeState getStorageState() {
-    return state;
-  }
-
-  public void setState(VolumeState state) {
-    this.state = state;
-  }
-
-  public boolean isFailed() {
-    return (state == VolumeState.FAILED);
-  }
-
   public VolumeIOStats getVolumeIOStats() {
     return volumeIOStats;
   }
 
   @Override
   public void failVolume() {
-    setState(VolumeState.FAILED);
     super.failVolume();
     if (volumeIOStats != null) {
       volumeIOStats.unregister();
     }
+    if (SchemaV3.isFinalizedAndEnabled(getConf())) {
+      closeDbStore();
+    }
   }
 
   @Override
   public void shutdown() {
-    this.state = VolumeState.NON_EXISTENT;
     super.shutdown();
     if (volumeIOStats != null) {
       volumeIOStats.unregister();
     }
-  }
-
-  /**
-   * VolumeState represents the different states a HddsVolume can be in.
-   * NORMAL          =&gt; Volume can be used for storage
-   * FAILED          =&gt; Volume has failed due and can no longer be used for
-   *                    storing containers.
-   * NON_EXISTENT    =&gt; Volume Root dir does not exist
-   * INCONSISTENT    =&gt; Volume Root dir is not empty but VERSION file is
-   *                    missing or Volume Root dir is not a directory
-   * NOT_FORMATTED   =&gt; Volume Root exists but not formatted(no VERSION file)
-   * NOT_INITIALIZED =&gt; VERSION file exists but has not been verified for
-   *                    correctness.
-   */
-  public enum VolumeState {
-    NORMAL,
-    FAILED,
-    NON_EXISTENT,
-    INCONSISTENT,
-    NOT_FORMATTED,
-    NOT_INITIALIZED
+    if (SchemaV3.isFinalizedAndEnabled(getConf())) {
+      closeDbStore();
+    }
   }
 
   /**
@@ -371,4 +165,118 @@ public class HddsVolume extends StorageVolume {
   public long getCommittedBytes() {
     return committedBytes.get();
   }
+
+  public void setDbVolume(DbVolume dbVolume) {
+    this.dbVolume = dbVolume;
+  }
+
+  public DbVolume getDbVolume() {
+    return this.dbVolume;
+  }
+
+  public File getDbParentDir() {
+    return this.dbParentDir;
+  }
+
+  public void loadDbStore() throws IOException {
+    // DN startup for the first time, not registered yet,
+    // so the DbVolume is not formatted.
+    if (!getStorageState().equals(VolumeState.NORMAL)) {
+      return;
+    }
+
+    File clusterIdDir = new File(dbVolume == null ?
+        getStorageDir() : dbVolume.getStorageDir(),
+        getClusterID());
+    if (!clusterIdDir.exists()) {
+      throw new IOException("Working dir " + clusterIdDir.getAbsolutePath() +
+          " not created for HddsVolume: " + getStorageDir().getAbsolutePath());
+    }
+
+    File storageIdDir = new File(clusterIdDir, getStorageID());
+    if (!storageIdDir.exists()) {
+      throw new IOException("Db parent dir " + storageIdDir.getAbsolutePath() +
+          " not found for HddsVolume: " + getStorageDir().getAbsolutePath());
+    }
+
+    File containerDBFile = new File(storageIdDir, CONTAINER_DB_NAME);
+    if (!containerDBFile.exists()) {
+      throw new IOException("Db dir " + storageIdDir.getAbsolutePath() +
+          " not found for HddsVolume: " + getStorageDir().getAbsolutePath());
+    }
+
+    String containerDBPath = containerDBFile.getAbsolutePath();
+    try {
+      initPerDiskDBStore(containerDBPath, getConf());
+    } catch (IOException e) {
+      throw new IOException("Can't init db instance under path "
+          + containerDBPath + " for volume " + getStorageID(), e);
+    }
+
+    dbParentDir = storageIdDir;
+  }
+
+  /**
+   * Pick a DbVolume for HddsVolume and init db instance.
+   * Use the HddsVolume directly if no DbVolume found.
+   * @param dbVolumeSet
+   */
+  public void createDbStore(MutableVolumeSet dbVolumeSet)
+      throws IOException {
+    DbVolume chosenDbVolume = null;
+    File clusterIdDir;
+
+    if (dbVolumeSet == null || dbVolumeSet.getVolumesList().isEmpty()) {
+      // No extra db volumes specified, just create db under the HddsVolume.
+      clusterIdDir = new File(getStorageDir(), getClusterID());
+    } else {
+      // Randomly choose a DbVolume for simplicity.
+      List<DbVolume> dbVolumeList = StorageVolumeUtil.getDbVolumesList(
+          dbVolumeSet.getVolumesList());
+      chosenDbVolume = dbVolumeList.get(
+          ThreadLocalRandom.current().nextInt(dbVolumeList.size()));
+      clusterIdDir = new File(chosenDbVolume.getStorageDir(), getClusterID());
+    }
+
+    if (!clusterIdDir.exists()) {
+      throw new IOException("The working dir "
+          + clusterIdDir.getAbsolutePath() + " is missing for volume "
+          + getStorageID());
+    }
+
+    // Init subdir with the storageID of HddsVolume.
+    File storageIdDir = new File(clusterIdDir, getStorageID());
+    if (!storageIdDir.mkdirs() && !storageIdDir.exists()) {
+      throw new IOException("Can't make subdir under "
+          + clusterIdDir.getAbsolutePath() + " for volume "
+          + getStorageID());
+    }
+
+    // Init the db instance for HddsVolume under the subdir above.
+    String containerDBPath = new File(storageIdDir, CONTAINER_DB_NAME)
+        .getAbsolutePath();
+    try {
+      initPerDiskDBStore(containerDBPath, getConf());
+    } catch (IOException e) {
+      throw new IOException("Can't init db instance under path "
+          + containerDBPath + " for volume " + getStorageID());
+    }
+
+    // Set the dbVolume and dbParentDir of the HddsVolume for db path lookup.
+    dbVolume = chosenDbVolume;
+    dbParentDir = storageIdDir;
+    if (chosenDbVolume != null) {
+      chosenDbVolume.addHddsDbStorePath(getStorageID(), containerDBPath);
+    }
+  }
+
+  private void closeDbStore() {
+    if (dbParentDir == null) {
+      return;
+    }
+
+    String containerDBPath = new File(dbParentDir, CONTAINER_DB_NAME)
+        .getAbsolutePath();
+    DatanodeStoreCache.getInstance().removeDB(containerDBPath);
+  }
 }
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/HddsVolumeFactory.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/HddsVolumeFactory.java
index 3b7b1085e5..afb301607e 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/HddsVolumeFactory.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/HddsVolumeFactory.java
@@ -21,7 +21,6 @@ package org.apache.hadoop.ozone.container.common.volume;
 import org.apache.hadoop.fs.StorageType;
 import org.apache.hadoop.hdds.conf.ConfigurationSource;
 import org.apache.hadoop.hdds.fs.SpaceUsageCheckFactory;
-import org.apache.hadoop.ozone.common.InconsistentStorageStateException;
 
 import java.io.IOException;
 
@@ -30,15 +29,10 @@ import java.io.IOException;
  */
 public class HddsVolumeFactory extends StorageVolumeFactory {
 
-  private String datanodeUuid;
-  private String clusterID;
-
   public HddsVolumeFactory(ConfigurationSource conf,
       SpaceUsageCheckFactory usageCheckFactory, MutableVolumeSet volumeSet,
       String datanodeUuid, String clusterID) {
-    super(conf, usageCheckFactory, volumeSet);
-    this.datanodeUuid = datanodeUuid;
-    this.clusterID = clusterID;
+    super(conf, usageCheckFactory, volumeSet, datanodeUuid, clusterID);
   }
 
   @Override
@@ -46,8 +40,8 @@ public class HddsVolumeFactory extends StorageVolumeFactory {
       StorageType storageType) throws IOException {
     HddsVolume.Builder volumeBuilder = new HddsVolume.Builder(locationString)
         .conf(getConf())
-        .datanodeUuid(datanodeUuid)
-        .clusterID(clusterID)
+        .datanodeUuid(getDatanodeUuid())
+        .clusterID(getClusterID())
         .usageCheckFactory(getUsageCheckFactory())
         .storageType(storageType)
         .volumeSet(getVolumeSet());
@@ -65,29 +59,4 @@ public class HddsVolumeFactory extends StorageVolumeFactory {
         .failedVolume(true);
     return volumeBuilder.build();
   }
-
-  /**
-   * If Version file exists and the {@link #clusterID} is not set yet,
-   * assign it the value from Version file. Otherwise, check that the given
-   * id matches with the id from version file.
-   * @param idFromVersionFile value of the property from Version file
-   * @throws InconsistentStorageStateException
-   */
-  private void checkAndSetClusterID(String idFromVersionFile)
-      throws InconsistentStorageStateException {
-    // If the clusterID is null (not set), assign it the value
-    // from version file.
-    if (this.clusterID == null) {
-      this.clusterID = idFromVersionFile;
-      return;
-    }
-
-    // If the clusterID is already set, it should match with the value from the
-    // version file.
-    if (!idFromVersionFile.equals(this.clusterID)) {
-      throw new InconsistentStorageStateException(
-          "Mismatched ClusterIDs. VolumeSet has: " + this.clusterID +
-              ", and version file has: " + idFromVersionFile);
-    }
-  }
 }
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/MetadataVolume.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/MetadataVolume.java
index c5532ffdbb..360c2c7baf 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/MetadataVolume.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/MetadataVolume.java
@@ -49,4 +49,9 @@ public class MetadataVolume extends StorageVolume {
       return new MetadataVolume(this);
     }
   }
+
+  @Override
+  public String getStorageID() {
+    return "";
+  }
 }
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/MetadataVolumeFactory.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/MetadataVolumeFactory.java
index b83cb3883c..cffe5b7f44 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/MetadataVolumeFactory.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/MetadataVolumeFactory.java
@@ -31,7 +31,7 @@ public class MetadataVolumeFactory extends StorageVolumeFactory {
 
   public MetadataVolumeFactory(ConfigurationSource conf,
       SpaceUsageCheckFactory usageCheckFactory, MutableVolumeSet volumeSet) {
-    super(conf, usageCheckFactory, volumeSet);
+    super(conf, usageCheckFactory, volumeSet, null, null);
   }
 
   @Override
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/MutableVolumeSet.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/MutableVolumeSet.java
index 98e16294da..78f0b9c4b7 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/MutableVolumeSet.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/MutableVolumeSet.java
@@ -121,6 +121,10 @@ public class MutableVolumeSet implements VolumeSet {
       this.volumeFactory = new MetadataVolumeFactory(conf, usageCheckFactory,
           this);
       maxVolumeFailuresTolerated = dnConf.getFailedMetadataVolumesTolerated();
+    } else if (volumeType == StorageVolume.VolumeType.DB_VOLUME) {
+      this.volumeFactory = new DbVolumeFactory(conf, usageCheckFactory,
+          this, datanodeUuid, clusterID);
+      maxVolumeFailuresTolerated = dnConf.getFailedDbVolumesTolerated();
     } else {
       this.volumeFactory = new HddsVolumeFactory(conf, usageCheckFactory,
           this, datanodeUuid, clusterID);
@@ -150,6 +154,8 @@ public class MutableVolumeSet implements VolumeSet {
     Collection<String> rawLocations;
     if (volumeType == StorageVolume.VolumeType.META_VOLUME) {
       rawLocations = HddsServerUtil.getOzoneDatanodeRatisDirectory(conf);
+    } else if (volumeType == StorageVolume.VolumeType.DB_VOLUME) {
+      rawLocations = HddsServerUtil.getDatanodeDbDirs(conf);
     } else {
       rawLocations = HddsServerUtil.getDatanodeStorageDirs(conf);
     }
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/StorageVolume.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/StorageVolume.java
index 715cb8400f..18468f5a9d 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/StorageVolume.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/StorageVolume.java
@@ -18,35 +18,91 @@
 
 package org.apache.hadoop.ozone.container.common.volume;
 
+import com.google.common.base.Preconditions;
 import org.apache.hadoop.fs.StorageType;
 import org.apache.hadoop.hdds.conf.ConfigurationSource;
 import org.apache.hadoop.hdds.fs.SpaceUsageCheckFactory;
 import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
 import org.apache.hadoop.hdfs.server.datanode.checker.Checkable;
 import org.apache.hadoop.hdfs.server.datanode.checker.VolumeCheckResult;
+import org.apache.hadoop.ozone.common.InconsistentStorageStateException;
+import org.apache.hadoop.ozone.container.common.helpers.DatanodeVersionFile;
+import org.apache.hadoop.ozone.container.common.utils.StorageVolumeUtil;
 import org.apache.hadoop.util.DiskChecker;
+import org.apache.hadoop.util.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
 import java.io.File;
 import java.io.IOException;
 import java.util.Objects;
+import java.util.Properties;
+import java.util.UUID;
+
+import static org.apache.hadoop.ozone.container.common.HDDSVolumeLayoutVersion.getLatestVersion;
 
 /**
  * StorageVolume represents a generic Volume in datanode, could be
  * 1. HddsVolume for container storage.
  * 2. MetadataVolume for metadata(ratis) storage.
+ *    This is a special type of volume, because it is managed
+ *    by ratis itself, so we don't format or initialize it in Ozone.
+ * 3. DbVolume for db instance storage.
+ *
+ * Each hdds volume has its own VERSION file. The hdds volume will have one
+ * clusterUuid directory for each SCM it is a part of.
+ *
+ * During DN startup, if the VERSION file exists, we verify that the
+ * clusterID in the version file matches the clusterID from SCM.
  */
 public abstract class StorageVolume
     implements Checkable<Boolean, VolumeCheckResult> {
 
+  private static final Logger LOG =
+      LoggerFactory.getLogger(StorageVolume.class);
+
   /**
    * Type for StorageVolume.
    */
   public enum VolumeType {
     DATA_VOLUME,
-    META_VOLUME
+    META_VOLUME,
+    DB_VOLUME,
   }
 
+  /**
+   * VolumeState represents the different states a StorageVolume can be in.
+   * NORMAL          =&gt; Volume can be used for storage
+   * FAILED          =&gt; Volume has failed due and can no longer be used for
+   *                    storing containers.
+   * NON_EXISTENT    =&gt; Volume Root dir does not exist
+   * INCONSISTENT    =&gt; Volume Root dir is not empty but VERSION file is
+   *                    missing or Volume Root dir is not a directory
+   * NOT_FORMATTED   =&gt; Volume Root exists but not formatted(no VERSION file)
+   * NOT_INITIALIZED =&gt; VERSION file exists but has not been verified for
+   *                    correctness.
+   */
+  public enum VolumeState {
+    NORMAL,
+    FAILED,
+    NON_EXISTENT,
+    INCONSISTENT,
+    NOT_FORMATTED,
+    NOT_INITIALIZED
+  }
+
+  private VolumeState state;
+
+  // VERSION file properties
+  private String storageID;       // id of the file system
+  private String clusterID;       // id of the cluster
+  private String datanodeUuid;    // id of the DataNode
+  private long cTime;             // creation time of the file system state
+  private int layoutVersion;      // layout version of the storage data
+
+  private ConfigurationSource conf;
+
   private final File storageDir;
 
   private final VolumeInfo volumeInfo;
@@ -62,13 +118,178 @@ public abstract class StorageVolume
           .usageCheckFactory(b.usageCheckFactory)
           .build();
       this.volumeSet = b.volumeSet;
+      this.state = VolumeState.NOT_INITIALIZED;
+      this.clusterID = b.clusterID;
+      this.datanodeUuid = b.datanodeUuid;
+      this.conf = b.conf;
     } else {
       storageDir = new File(b.volumeRootStr);
       this.volumeInfo = null;
       this.volumeSet = null;
+      this.storageID = UUID.randomUUID().toString();
+      this.state = VolumeState.FAILED;
+    }
+  }
+
+  public void format(String cid) throws IOException {
+    Preconditions.checkNotNull(cid, "clusterID cannot be null while " +
+        "formatting Volume");
+    this.clusterID = cid;
+    initialize();
+  }
+
+  /**
+   * Initializes the volume.
+   * Creates the Version file if not present,
+   * otherwise returns with IOException.
+   * @throws IOException
+   */
+  protected void initialize() throws IOException {
+    VolumeState intialVolumeState = analyzeVolumeState();
+    switch (intialVolumeState) {
+    case NON_EXISTENT:
+      // Root directory does not exist. Create it.
+      if (!getStorageDir().mkdirs()) {
+        throw new IOException("Cannot create directory " + getStorageDir());
+      }
+      setState(VolumeState.NOT_FORMATTED);
+      createVersionFile();
+      break;
+    case NOT_FORMATTED:
+      // Version File does not exist. Create it.
+      createVersionFile();
+      break;
+    case NOT_INITIALIZED:
+      // Version File exists.
+      // Verify its correctness and update property fields.
+      readVersionFile();
+      setState(VolumeState.NORMAL);
+      break;
+    case INCONSISTENT:
+      // Volume Root is in an inconsistent state. Skip loading this volume.
+      throw new IOException("Volume is in an " + VolumeState.INCONSISTENT +
+          " state. Skipped loading volume: " + getStorageDir().getPath());
+    default:
+      throw new IOException("Unrecognized initial state : " +
+          intialVolumeState + "of volume : " + getStorageDir());
     }
   }
 
+  /**
+   * Create working directory for cluster io loads.
+   * @param workingDirName scmID or clusterID according to SCM HA config
+   * @param dbVolumeSet optional dbVolumes
+   * @throws IOException
+   */
+  public void createWorkingDir(String workingDirName,
+      MutableVolumeSet dbVolumeSet) throws IOException {
+    File idDir = new File(getStorageDir(), workingDirName);
+    if (!idDir.mkdir()) {
+      throw new IOException("Unable to create ID directory " + idDir +
+          " for datanode.");
+    }
+  }
+
+  private VolumeState analyzeVolumeState() {
+    if (!getStorageDir().exists()) {
+      // Volume Root does not exist.
+      return VolumeState.NON_EXISTENT;
+    }
+    if (!getStorageDir().isDirectory()) {
+      // Volume Root exists but is not a directory.
+      LOG.warn("Volume {} exists but is not a directory,"
+              + " current volume state: {}.",
+          getStorageDir().getPath(), VolumeState.INCONSISTENT);
+      return VolumeState.INCONSISTENT;
+    }
+    File[] files = getStorageDir().listFiles();
+    if (files == null || files.length == 0) {
+      // Volume Root exists and is empty.
+      return VolumeState.NOT_FORMATTED;
+    }
+    if (!getVersionFile().exists()) {
+      // Volume Root is non empty but VERSION file does not exist.
+      LOG.warn("VERSION file does not exist in volume {},"
+              + " current volume state: {}.",
+          getStorageDir().getPath(), VolumeState.INCONSISTENT);
+      return VolumeState.INCONSISTENT;
+    }
+    // Volume Root and VERSION file exist.
+    return VolumeState.NOT_INITIALIZED;
+  }
+
+  /**
+   * Create Version File and write property fields into it.
+   * @throws IOException
+   */
+  private void createVersionFile() throws IOException {
+    this.storageID = StorageVolumeUtil.generateUuid();
+    this.cTime = Time.now();
+    this.layoutVersion = getLatestVersion().getVersion();
+
+    if (this.clusterID == null || datanodeUuid == null) {
+      // HddsDatanodeService does not have the cluster information yet. Wait
+      // for registration with SCM.
+      LOG.debug("ClusterID not available. Cannot format the volume {}",
+          getStorageDir().getPath());
+      setState(VolumeState.NOT_FORMATTED);
+    } else {
+      // Write the version file to disk.
+      writeVersionFile();
+      setState(VolumeState.NORMAL);
+    }
+  }
+
+  private void writeVersionFile() throws IOException {
+    Preconditions.checkNotNull(this.storageID,
+        "StorageID cannot be null in Version File");
+    Preconditions.checkNotNull(this.clusterID,
+        "ClusterID cannot be null in Version File");
+    Preconditions.checkNotNull(this.datanodeUuid,
+        "DatanodeUUID cannot be null in Version File");
+    Preconditions.checkArgument(this.cTime > 0,
+        "Creation Time should be positive");
+    Preconditions.checkArgument(this.layoutVersion ==
+            getLatestVersion().getVersion(),
+        "Version File should have the latest LayOutVersion");
+
+    File versionFile = getVersionFile();
+    LOG.debug("Writing Version file to disk, {}", versionFile);
+
+    DatanodeVersionFile dnVersionFile = new DatanodeVersionFile(this.storageID,
+        this.clusterID, this.datanodeUuid, this.cTime, this.layoutVersion);
+    dnVersionFile.createVersionFile(versionFile);
+  }
+
+  /**
+   * Read Version File and update property fields.
+   * Get common storage fields.
+   * Should be overloaded if additional fields need to be read.
+   *
+   * @throws IOException on error
+   */
+  private void readVersionFile() throws IOException {
+    File versionFile = getVersionFile();
+    Properties props = DatanodeVersionFile.readFrom(versionFile);
+    if (props.isEmpty()) {
+      throw new InconsistentStorageStateException(
+          "Version file " + versionFile + " is missing");
+    }
+
+    LOG.debug("Reading Version file from disk, {}", versionFile);
+    this.storageID = StorageVolumeUtil.getStorageID(props, versionFile);
+    this.clusterID = StorageVolumeUtil.getClusterID(props, versionFile,
+        this.clusterID);
+    this.datanodeUuid = StorageVolumeUtil.getDatanodeUUID(props, versionFile,
+        this.datanodeUuid);
+    this.cTime = StorageVolumeUtil.getCreationTime(props, versionFile);
+    this.layoutVersion = StorageVolumeUtil.getLayOutVersion(props, versionFile);
+  }
+
+  private File getVersionFile() {
+    return StorageVolumeUtil.getVersionFile(getStorageDir());
+  }
+
   /**
    * Builder class for StorageVolume.
    * @param <T> subclass Builder
@@ -81,6 +302,8 @@ public abstract class StorageVolume
     private SpaceUsageCheckFactory usageCheckFactory;
     private VolumeSet volumeSet;
     private boolean failedVolume = false;
+    private String datanodeUuid;
+    private String clusterID;
 
     public Builder(String volumeRootStr, String storageDirStr) {
       this.volumeRootStr = volumeRootStr;
@@ -117,6 +340,16 @@ public abstract class StorageVolume
       return this.getThis();
     }
 
+    public T datanodeUuid(String datanodeUUID) {
+      this.datanodeUuid = datanodeUUID;
+      return this.getThis();
+    }
+
+    public T clusterID(String cid) {
+      this.clusterID = cid;
+      return this.getThis();
+    }
+
     public abstract StorageVolume build() throws IOException;
 
     public String getVolumeRootStr() {
@@ -168,16 +401,50 @@ public abstract class StorageVolume
   }
 
   public String getStorageID() {
-    return "";
+    return storageID;
+  }
+
+  public String getClusterID() {
+    return clusterID;
+  }
+
+  public String getDatanodeUuid() {
+    return datanodeUuid;
+  }
+
+  public long getCTime() {
+    return cTime;
+  }
+
+  public int getLayoutVersion() {
+    return layoutVersion;
+  }
+
+  public VolumeState getStorageState() {
+    return state;
+  }
+
+  public void setState(VolumeState state) {
+    this.state = state;
+  }
+
+  public boolean isFailed() {
+    return (state == VolumeState.FAILED);
+  }
+
+  public ConfigurationSource getConf() {
+    return conf;
   }
 
   public void failVolume() {
+    setState(VolumeState.FAILED);
     if (volumeInfo != null) {
       volumeInfo.shutdownUsageThread();
     }
   }
 
   public void shutdown() {
+    setState(VolumeState.NON_EXISTENT);
     if (volumeInfo != null) {
       volumeInfo.shutdownUsageThread();
     }
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/StorageVolumeFactory.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/StorageVolumeFactory.java
index 9273f3567a..7527eb807b 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/StorageVolumeFactory.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/StorageVolumeFactory.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.ozone.container.common.volume;
 import org.apache.hadoop.fs.StorageType;
 import org.apache.hadoop.hdds.conf.ConfigurationSource;
 import org.apache.hadoop.hdds.fs.SpaceUsageCheckFactory;
+import org.apache.hadoop.ozone.common.InconsistentStorageStateException;
 
 import java.io.IOException;
 
@@ -32,12 +33,17 @@ public abstract class StorageVolumeFactory {
   private ConfigurationSource conf;
   private SpaceUsageCheckFactory usageCheckFactory;
   private MutableVolumeSet volumeSet;
+  private String datanodeUuid;
+  private String clusterID;
 
   public StorageVolumeFactory(ConfigurationSource conf,
-      SpaceUsageCheckFactory usageCheckFactory, MutableVolumeSet volumeSet) {
+      SpaceUsageCheckFactory usageCheckFactory, MutableVolumeSet volumeSet,
+      String datanodeUuid, String clusterID) {
     this.conf = conf;
     this.usageCheckFactory = usageCheckFactory;
     this.volumeSet = volumeSet;
+    this.datanodeUuid = datanodeUuid;
+    this.clusterID = clusterID;
   }
 
   public ConfigurationSource getConf() {
@@ -52,6 +58,39 @@ public abstract class StorageVolumeFactory {
     return this.volumeSet;
   }
 
+  public String getDatanodeUuid() {
+    return this.datanodeUuid;
+  }
+
+  public String getClusterID() {
+    return this.clusterID;
+  }
+
+  /**
+   * If Version file exists and the {@link #clusterID} is not set yet,
+   * assign it the value from Version file. Otherwise, check that the given
+   * id matches with the id from version file.
+   * @param idFromVersionFile value of the property from Version file
+   * @throws InconsistentStorageStateException
+   */
+  protected void checkAndSetClusterID(String idFromVersionFile)
+      throws InconsistentStorageStateException {
+    // If the clusterID is null (not set), assign it the value
+    // from version file.
+    if (this.clusterID == null) {
+      this.clusterID = idFromVersionFile;
+      return;
+    }
+
+    // If the clusterID is already set, it should match with the value from the
+    // version file.
+    if (!idFromVersionFile.equals(this.clusterID)) {
+      throw new InconsistentStorageStateException(
+          "Mismatched ClusterIDs. VolumeSet has: " + this.clusterID +
+              ", and version file has: " + idFromVersionFile);
+    }
+  }
+
   abstract StorageVolume createVolume(String locationString,
       StorageType storageType) throws IOException;
 
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
index 1af9c882a1..2709ea5e3e 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
@@ -40,6 +40,7 @@ import org.apache.hadoop.hdds.security.token.TokenVerifier;
 import org.apache.hadoop.hdds.security.x509.SecurityConfig;
 import org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClient;
 import org.apache.hadoop.hdds.utils.HAUtils;
+import org.apache.hadoop.hdds.utils.HddsServerUtil;
 import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics;
 import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
 import org.apache.hadoop.ozone.container.common.impl.HddsDispatcher;
@@ -54,6 +55,7 @@ import org.apache.hadoop.ozone.container.common.transport.server.XceiverServerGr
 import org.apache.hadoop.ozone.container.common.transport.server.XceiverServerSpi;
 import org.apache.hadoop.ozone.container.common.transport.server.ratis.XceiverServerRatis;
 import org.apache.hadoop.ozone.container.common.utils.ContainerInspectorUtil;
+import org.apache.hadoop.ozone.container.common.utils.HddsVolumeUtil;
 import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
 import org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet;
 import org.apache.hadoop.ozone.container.common.volume.StorageVolume;
@@ -62,6 +64,7 @@ import org.apache.hadoop.ozone.container.common.volume.StorageVolumeChecker;
 import org.apache.hadoop.ozone.container.keyvalue.statemachine.background.BlockDeletingService;
 import org.apache.hadoop.ozone.container.replication.ReplicationServer;
 import org.apache.hadoop.ozone.container.replication.ReplicationServer.ReplicationConfig;
+import org.apache.hadoop.ozone.container.upgrade.VersionedDatanodeFeatures.SchemaV3;
 import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -91,6 +94,7 @@ public class OzoneContainer {
   private final ConfigurationSource config;
   private final MutableVolumeSet volumeSet;
   private final MutableVolumeSet metaVolumeSet;
+  private final MutableVolumeSet dbVolumeSet;
   private final StorageVolumeChecker volumeChecker;
   private final ContainerSet containerSet;
   private final XceiverServerSpi writeChannel;
@@ -133,6 +137,14 @@ public class OzoneContainer {
     volumeSet.setFailedVolumeListener(this::handleVolumeFailures);
     metaVolumeSet = new MutableVolumeSet(datanodeDetails.getUuidString(), conf,
         context, VolumeType.META_VOLUME, volumeChecker);
+    if (SchemaV3.isFinalizedAndEnabled(conf)) {
+      dbVolumeSet = HddsServerUtil.getDatanodeDbDirs(conf).isEmpty() ? null :
+          new MutableVolumeSet(datanodeDetails.getUuidString(), conf,
+              context, VolumeType.DB_VOLUME, volumeChecker);
+      HddsVolumeUtil.loadAllHddsVolumeDbStore(volumeSet, dbVolumeSet, LOG);
+    } else {
+      dbVolumeSet = null;
+    }
 
     containerSet = new ContainerSet();
     metadataScanner = null;
@@ -363,6 +375,9 @@ public class OzoneContainer {
     volumeChecker.shutdownAndWait(0, TimeUnit.SECONDS);
     volumeSet.shutdown();
     metaVolumeSet.shutdown();
+    if (dbVolumeSet != null) {
+      dbVolumeSet.shutdown();
+    }
     blockDeletingService.shutdown();
     ContainerMetrics.remove();
   }
@@ -421,6 +436,14 @@ public class OzoneContainer {
       nrb.addMetadataStorageReport(
           metaReports[i].getMetadataProtoBufMessage());
     }
+
+    if (dbVolumeSet != null) {
+      StorageLocationReport[] dbReports = dbVolumeSet.getStorageReport();
+      for (int i = 0; i < dbReports.length; i++) {
+        nrb.addDbStorageReport(dbReports[i].getProtoBufMessage());
+      }
+    }
+
     return nrb.build();
   }
 
@@ -437,9 +460,12 @@ public class OzoneContainer {
     return metaVolumeSet;
   }
 
+  public MutableVolumeSet getDbVolumeSet() {
+    return dbVolumeSet;
+  }
+
   @VisibleForTesting
   StorageVolumeChecker getVolumeChecker(ConfigurationSource conf) {
     return new StorageVolumeChecker(conf, new Timer());
   }
-
 }
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/upgrade/ScmHAFinalizeUpgradeActionDatanode.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/upgrade/ScmHAFinalizeUpgradeActionDatanode.java
index b4c130cfd1..3a830d00ae 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/upgrade/ScmHAFinalizeUpgradeActionDatanode.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/upgrade/ScmHAFinalizeUpgradeActionDatanode.java
@@ -71,7 +71,7 @@ public class ScmHAFinalizeUpgradeActionDatanode
    * Upgrade the specified volume to be compatible with SCM HA layout feature.
    * @return true if the volume upgrade succeeded, false otherwise.
    */
-  public static boolean upgradeVolume(HddsVolume volume, String clusterID) {
+  public static boolean upgradeVolume(StorageVolume volume, String clusterID) {
     Preconditions.checkNotNull(clusterID, "Cannot upgrade volume with null " +
         "cluster ID");
     File hddsVolumeDir = volume.getStorageDir();
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/upgrade/VersionedDatanodeFeatures.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/upgrade/VersionedDatanodeFeatures.java
index 3653e6c9fa..5f52191e36 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/upgrade/VersionedDatanodeFeatures.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/upgrade/VersionedDatanodeFeatures.java
@@ -22,7 +22,7 @@ import org.apache.hadoop.hdds.scm.ScmConfigKeys;
 import org.apache.hadoop.hdds.upgrade.HDDSLayoutFeature;
 import org.apache.hadoop.hdds.upgrade.HDDSLayoutVersionManager;
 import org.apache.hadoop.ozone.OzoneConsts;
-import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
+import org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration;
 import org.apache.hadoop.ozone.container.common.volume.StorageVolume;
 
 import java.io.File;
@@ -127,7 +127,7 @@ public final class VersionedDatanodeFeatures {
       }
     }
 
-    public static boolean upgradeVolumeIfNeeded(HddsVolume volume,
+    public static boolean upgradeVolumeIfNeeded(StorageVolume volume,
         String clusterID) {
       File clusterIDDir = new File(volume.getStorageDir(), clusterID);
       boolean needsUpgrade = isFinalized(HDDSLayoutFeature.SCM_HA) &&
@@ -142,4 +142,28 @@ public final class VersionedDatanodeFeatures {
       return success;
     }
   }
+
+  /**
+   * Utilities for container Schema V3 layout feature.
+   * This schema put all container metadata info into a per-disk
+   * rocksdb instance instead of a per-container instance.
+   */
+  public static class SchemaV3 {
+    public static String chooseSchemaVersion(ConfigurationSource conf) {
+      if (isFinalizedAndEnabled(conf)) {
+        return OzoneConsts.SCHEMA_V3;
+      } else {
+        return SchemaV2.chooseSchemaVersion();
+      }
+    }
+
+    public static boolean isFinalizedAndEnabled(ConfigurationSource conf) {
+      DatanodeConfiguration dcf = conf.getObject(DatanodeConfiguration.class);
+      if (isFinalized(HDDSLayoutFeature.DATANODE_SCHEMA_V3)
+          && dcf.getContainerSchemaV3Enabled()) {
+        return true;
+      }
+      return false;
+    }
+  }
 }
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/ContainerTestUtils.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/ContainerTestUtils.java
index 825432290d..405908de3e 100644
--- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/ContainerTestUtils.java
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/ContainerTestUtils.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.ipc.ProtobufRpcEngine;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.ozone.container.common.impl.ContainerLayoutVersion;
+import org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration;
 import org.apache.hadoop.ozone.container.common.statemachine.DatanodeStateMachine;
 import org.apache.hadoop.ozone.container.common.statemachine.EndpointStateMachine;
 import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
@@ -125,4 +126,16 @@ public final class ContainerTestUtils {
     kvData.setState(state);
     return new KeyValueContainer(kvData, new OzoneConfiguration());
   }
+
+  public static void enableSchemaV3(OzoneConfiguration conf) {
+    DatanodeConfiguration dc = conf.getObject(DatanodeConfiguration.class);
+    dc.setContainerSchemaV3Enabled(true);
+    conf.setFromObject(dc);
+  }
+
+  public static void disableSchemaV3(OzoneConfiguration conf) {
+    DatanodeConfiguration dc = conf.getObject(DatanodeConfiguration.class);
+    dc.setContainerSchemaV3Enabled(false);
+    conf.setFromObject(dc);
+  }
 }
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/helpers/TestDatanodeVersionFile.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/helpers/TestDatanodeVersionFile.java
index 44f0a7f8c4..02b673bfaa 100644
--- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/helpers/TestDatanodeVersionFile.java
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/helpers/TestDatanodeVersionFile.java
@@ -19,7 +19,7 @@ package org.apache.hadoop.ozone.container.common.helpers;
 
 import org.apache.hadoop.ozone.common.InconsistentStorageStateException;
 import org.apache.hadoop.ozone.container.common.HDDSVolumeLayoutVersion;
-import org.apache.hadoop.ozone.container.common.utils.HddsVolumeUtil;
+import org.apache.hadoop.ozone.container.common.utils.StorageVolumeUtil;
 import org.apache.ozone.test.GenericTestUtils;
 import org.apache.hadoop.util.Time;
 import org.junit.Before;
@@ -77,15 +77,15 @@ public class TestDatanodeVersionFile {
     //Check VersionFile exists
     assertTrue(versionFile.exists());
 
-    assertEquals(storageID, HddsVolumeUtil.getStorageID(
+    assertEquals(storageID, StorageVolumeUtil.getStorageID(
         properties, versionFile));
-    assertEquals(clusterID, HddsVolumeUtil.getClusterID(
+    assertEquals(clusterID, StorageVolumeUtil.getClusterID(
         properties, versionFile, clusterID));
-    assertEquals(datanodeUUID, HddsVolumeUtil.getDatanodeUUID(
+    assertEquals(datanodeUUID, StorageVolumeUtil.getDatanodeUUID(
         properties, versionFile, datanodeUUID));
-    assertEquals(cTime, HddsVolumeUtil.getCreationTime(
+    assertEquals(cTime, StorageVolumeUtil.getCreationTime(
         properties, versionFile));
-    assertEquals(lv, HddsVolumeUtil.getLayOutVersion(
+    assertEquals(lv, StorageVolumeUtil.getLayOutVersion(
         properties, versionFile));
   }
 
@@ -93,7 +93,7 @@ public class TestDatanodeVersionFile {
   public void testIncorrectClusterId() throws IOException {
     try {
       String randomClusterID = UUID.randomUUID().toString();
-      HddsVolumeUtil.getClusterID(properties, versionFile,
+      StorageVolumeUtil.getClusterID(properties, versionFile,
           randomClusterID);
       fail("Test failure in testIncorrectClusterId");
     } catch (InconsistentStorageStateException ex) {
@@ -110,7 +110,7 @@ public class TestDatanodeVersionFile {
     properties = dnVersionFile.readFrom(versionFile);
 
     try {
-      HddsVolumeUtil.getCreationTime(properties, versionFile);
+      StorageVolumeUtil.getCreationTime(properties, versionFile);
       fail("Test failure in testVerifyCTime");
     } catch (InconsistentStorageStateException ex) {
       GenericTestUtils.assertExceptionContains("Invalid Creation time in " +
@@ -127,7 +127,7 @@ public class TestDatanodeVersionFile {
     Properties props = dnVersionFile.readFrom(versionFile);
 
     try {
-      HddsVolumeUtil.getLayOutVersion(props, versionFile);
+      StorageVolumeUtil.getLayOutVersion(props, versionFile);
       fail("Test failure in testVerifyLayOut");
     } catch (InconsistentStorageStateException ex) {
       GenericTestUtils.assertExceptionContains("Invalid layOutVersion.", ex);
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/TestDatanodeConfiguration.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/TestDatanodeConfiguration.java
index 5f1b0a6320..2bd1f0b59f 100644
--- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/TestDatanodeConfiguration.java
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/TestDatanodeConfiguration.java
@@ -28,6 +28,7 @@ import static org.apache.hadoop.ozone.container.common.statemachine.DatanodeConf
 import static org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration.DISK_CHECK_MIN_GAP_DEFAULT;
 import static org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration.DISK_CHECK_TIMEOUT_DEFAULT;
 import static org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration.DISK_CHECK_TIMEOUT_KEY;
+import static org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration.FAILED_DB_VOLUMES_TOLERATED_KEY;
 import static org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration.PERIODIC_DISK_CHECK_INTERVAL_MINUTES_KEY;
 import static org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration.PERIODIC_DISK_CHECK_INTERVAL_MINUTES_DEFAULT;
 import static org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration.FAILED_DATA_VOLUMES_TOLERATED_KEY;
@@ -57,6 +58,8 @@ public class TestDatanodeConfiguration {
         validFailedVolumesTolerated);
     conf.setInt(FAILED_METADATA_VOLUMES_TOLERATED_KEY,
         validFailedVolumesTolerated);
+    conf.setInt(FAILED_DB_VOLUMES_TOLERATED_KEY,
+        validFailedVolumesTolerated);
     conf.setTimeDuration(DISK_CHECK_MIN_GAP_KEY,
         validDiskCheckMinGap, TimeUnit.MINUTES);
     conf.setTimeDuration(DISK_CHECK_TIMEOUT_KEY,
@@ -73,6 +76,8 @@ public class TestDatanodeConfiguration {
         subject.getFailedDataVolumesTolerated());
     assertEquals(validFailedVolumesTolerated,
         subject.getFailedMetadataVolumesTolerated());
+    assertEquals(validFailedVolumesTolerated,
+        subject.getFailedDbVolumesTolerated());
     assertEquals(validDiskCheckMinGap,
         subject.getDiskCheckMinGap().toMinutes());
     assertEquals(validDiskCheckTimeout,
@@ -95,6 +100,8 @@ public class TestDatanodeConfiguration {
         invalidFailedVolumesTolerated);
     conf.setInt(FAILED_METADATA_VOLUMES_TOLERATED_KEY,
         invalidFailedVolumesTolerated);
+    conf.setInt(FAILED_DB_VOLUMES_TOLERATED_KEY,
+        invalidFailedVolumesTolerated);
     conf.setTimeDuration(DISK_CHECK_MIN_GAP_KEY,
         invalidDiskCheckMinGap, TimeUnit.MINUTES);
     conf.setTimeDuration(DISK_CHECK_TIMEOUT_KEY,
@@ -112,6 +119,8 @@ public class TestDatanodeConfiguration {
         subject.getFailedDataVolumesTolerated());
     assertEquals(FAILED_VOLUMES_TOLERATED_DEFAULT,
         subject.getFailedMetadataVolumesTolerated());
+    assertEquals(FAILED_VOLUMES_TOLERATED_DEFAULT,
+        subject.getFailedDbVolumesTolerated());
     assertEquals(DISK_CHECK_MIN_GAP_DEFAULT,
         subject.getDiskCheckMinGap().toMillis());
     assertEquals(DISK_CHECK_TIMEOUT_DEFAULT,
@@ -135,6 +144,8 @@ public class TestDatanodeConfiguration {
         subject.getFailedDataVolumesTolerated());
     assertEquals(FAILED_VOLUMES_TOLERATED_DEFAULT,
         subject.getFailedMetadataVolumesTolerated());
+    assertEquals(FAILED_VOLUMES_TOLERATED_DEFAULT,
+        subject.getFailedDbVolumesTolerated());
     assertEquals(DISK_CHECK_MIN_GAP_DEFAULT,
         subject.getDiskCheckMinGap().toMillis());
     assertEquals(DISK_CHECK_TIMEOUT_DEFAULT,
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/utils/TestHddsVolumeUtil.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/utils/TestHddsVolumeUtil.java
new file mode 100644
index 0000000000..de3fc3d16d
--- /dev/null
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/utils/TestHddsVolumeUtil.java
@@ -0,0 +1,238 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.container.common.utils;
+
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import org.apache.hadoop.ozone.OzoneConfigKeys;
+import org.apache.hadoop.ozone.container.common.ContainerTestUtils;
+import org.apache.hadoop.ozone.container.common.volume.DbVolume;
+import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
+import org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet;
+import org.apache.hadoop.ozone.container.common.volume.StorageVolume;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Test for {@link HddsVolumeUtil}.
+ */
+public class TestHddsVolumeUtil {
+  @Rule
+  public final TemporaryFolder tempDir = new TemporaryFolder();
+
+  private final String datanodeId = UUID.randomUUID().toString();
+  private final String clusterId = UUID.randomUUID().toString();
+  private final OzoneConfiguration conf = new OzoneConfiguration();
+  private static final int VOLUMNE_NUM = 3;
+  private MutableVolumeSet hddsVolumeSet;
+  private MutableVolumeSet dbVolumeSet;
+
+  @Before
+  public void setup() throws Exception {
+    ContainerTestUtils.enableSchemaV3(conf);
+
+    // Create hdds volumes for loadAll test.
+    File[] hddsVolumeDirs = new File[VOLUMNE_NUM];
+    StringBuilder hddsDirs = new StringBuilder();
+    for (int i = 0; i < VOLUMNE_NUM; i++) {
+      hddsVolumeDirs[i] = tempDir.newFolder();
+      hddsDirs.append(hddsVolumeDirs[i]).append(",");
+    }
+    conf.set(ScmConfigKeys.HDDS_DATANODE_DIR_KEY, hddsDirs.toString());
+    hddsVolumeSet = new MutableVolumeSet(datanodeId, clusterId, conf, null,
+        StorageVolume.VolumeType.DATA_VOLUME, null);
+
+    // Create db volumes for format and loadAll test.
+    File[] dbVolumeDirs = new File[VOLUMNE_NUM];
+    StringBuilder dbDirs = new StringBuilder();
+    for (int i = 0; i < VOLUMNE_NUM; i++) {
+      dbVolumeDirs[i] = tempDir.newFolder();
+      dbDirs.append(dbVolumeDirs[i]).append(",");
+    }
+    conf.set(OzoneConfigKeys.HDDS_DATANODE_CONTAINER_DB_DIR,
+        dbDirs.toString());
+    dbVolumeSet = new MutableVolumeSet(datanodeId, clusterId, conf, null,
+        StorageVolume.VolumeType.DB_VOLUME, null);
+  }
+
+  @After
+  public void teardown() {
+    hddsVolumeSet.shutdown();
+    dbVolumeSet.shutdown();
+  }
+
+  @Test
+  public void testLoadAllHddsVolumeDbStoreWithoutDbVolumes()
+      throws IOException {
+    // Create db instances for all HddsVolumes.
+    for (HddsVolume hddsVolume : StorageVolumeUtil.getHddsVolumesList(
+        hddsVolumeSet.getVolumesList())) {
+      hddsVolume.format(clusterId);
+      hddsVolume.createWorkingDir(clusterId, null);
+    }
+
+    // Reinitialize all the volumes to simulate a DN restart.
+    reinitVolumes();
+    HddsVolumeUtil.loadAllHddsVolumeDbStore(hddsVolumeSet, null, null);
+
+    for (HddsVolume hddsVolume : StorageVolumeUtil.getHddsVolumesList(
+        hddsVolumeSet.getVolumesList())) {
+      File storageIdDir = new File(new File(hddsVolume.getStorageDir(),
+          clusterId), hddsVolume.getStorageID());
+
+      // No dbVolumes given, so use the hddsVolume to store db instance.
+      assertNull(hddsVolume.getDbVolume());
+      assertEquals(storageIdDir, hddsVolume.getDbParentDir());
+    }
+  }
+
+  @Test
+  public void testLoadAllHddsVolumeDbStoreWithDbVolumes()
+      throws IOException {
+    // Initialize all DbVolumes
+    for (DbVolume dbVolume : StorageVolumeUtil.getDbVolumesList(
+        dbVolumeSet.getVolumesList())) {
+      dbVolume.format(clusterId);
+      dbVolume.createWorkingDir(clusterId, null);
+    }
+
+    // Create db instances for all HddsVolumes.
+    for (HddsVolume hddsVolume : StorageVolumeUtil.getHddsVolumesList(
+        hddsVolumeSet.getVolumesList())) {
+      hddsVolume.format(clusterId);
+      hddsVolume.createWorkingDir(clusterId, dbVolumeSet);
+    }
+
+    // Reinitialize all the volumes to simulate a DN restart.
+    reinitVolumes();
+    HddsVolumeUtil.loadAllHddsVolumeDbStore(hddsVolumeSet, dbVolumeSet, null);
+
+    for (HddsVolume hddsVolume : StorageVolumeUtil.getHddsVolumesList(
+        hddsVolumeSet.getVolumesList())) {
+      File storageIdDir = new File(new File(hddsVolume.getStorageDir(),
+          clusterId), hddsVolume.getStorageID());
+
+      // Should not use the hddsVolume itself
+      assertNotNull(hddsVolume.getDbVolume());
+      assertNotNull(hddsVolume.getDbParentDir());
+      assertNotEquals(storageIdDir, hddsVolume.getDbParentDir());
+    }
+  }
+
+  @Test
+  public void testNoDupDbStoreCreatedWithBadDbVolumes()
+      throws IOException {
+    // Initialize all DbVolumes
+    for (DbVolume dbVolume : StorageVolumeUtil.getDbVolumesList(
+        dbVolumeSet.getVolumesList())) {
+      dbVolume.format(clusterId);
+      dbVolume.createWorkingDir(clusterId, null);
+    }
+
+    // Create db instances for all HddsVolumes.
+    for (HddsVolume hddsVolume : StorageVolumeUtil.getHddsVolumesList(
+        hddsVolumeSet.getVolumesList())) {
+      hddsVolume.format(clusterId);
+      hddsVolume.createWorkingDir(clusterId, dbVolumeSet);
+    }
+
+    // Pick a dbVolume and make it fail,
+    // we should pick a dbVolume with db instances on it,
+    // and record the affected HddsVolume storageIDs.
+    int badDbVolumeCount = 0;
+    List<String> affectedHddsVolumeIDs = new ArrayList<>();
+    File badVolumeDir = null;
+    for (DbVolume dbVolume : StorageVolumeUtil.getDbVolumesList(
+        dbVolumeSet.getVolumesList())) {
+      if (!dbVolume.getHddsVolumeIDs().isEmpty()) {
+        affectedHddsVolumeIDs.addAll(dbVolume.getHddsVolumeIDs());
+        badVolumeDir = dbVolume.getStorageDir();
+        failVolume(badVolumeDir);
+        badDbVolumeCount++;
+        break;
+      }
+    }
+    assertEquals(1, badDbVolumeCount);
+    assertFalse(affectedHddsVolumeIDs.isEmpty());
+    assertNotNull(badVolumeDir);
+
+    // Reinitialize all the volumes to simulate a DN restart.
+    reinitVolumes();
+    assertEquals(1, dbVolumeSet.getFailedVolumesList().size());
+    assertEquals(VOLUMNE_NUM - 1, dbVolumeSet.getVolumesList().size());
+    HddsVolumeUtil.loadAllHddsVolumeDbStore(hddsVolumeSet, dbVolumeSet, null);
+
+    int affectedVolumeCount = 0;
+
+    for (HddsVolume hddsVolume : StorageVolumeUtil.getHddsVolumesList(
+        hddsVolumeSet.getVolumesList())) {
+      File storageIdDir = new File(new File(hddsVolume.getStorageDir(),
+          clusterId), hddsVolume.getStorageID());
+
+      // This hddsVolume itself is not failed, so we could still get it here
+      if (affectedHddsVolumeIDs.contains(hddsVolume.getStorageID())) {
+        // Should not create a duplicate db instance
+        assertFalse(storageIdDir.exists());
+        assertNull(hddsVolume.getDbVolume());
+        assertNull(hddsVolume.getDbParentDir());
+        affectedVolumeCount++;
+      } else {
+        // Should not use the hddsVolume itself
+        assertNotNull(hddsVolume.getDbVolume());
+        assertNotNull(hddsVolume.getDbParentDir());
+        assertNotEquals(storageIdDir, hddsVolume.getDbParentDir());
+      }
+    }
+    assertEquals(affectedHddsVolumeIDs.size(), affectedVolumeCount);
+  }
+
+  private void reinitVolumes() throws IOException {
+    hddsVolumeSet.shutdown();
+    dbVolumeSet.shutdown();
+
+    dbVolumeSet = new MutableVolumeSet(datanodeId, conf, null,
+        StorageVolume.VolumeType.DB_VOLUME, null);
+    hddsVolumeSet = new MutableVolumeSet(datanodeId, conf, null,
+        StorageVolume.VolumeType.DATA_VOLUME, null);
+  }
+
+  /**
+   * Fail a volume by removing the VERSION file.
+   * @param volumeDir
+   */
+  private void failVolume(File volumeDir) {
+    File versionFile = new File(volumeDir, "VERSION");
+    assertTrue(versionFile.delete());
+  }
+}
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/utils/TestStorageVolumeUtil.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/utils/TestStorageVolumeUtil.java
new file mode 100644
index 0000000000..b7f1397ab3
--- /dev/null
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/utils/TestStorageVolumeUtil.java
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.container.common.utils;
+
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.fs.MockSpaceUsageCheckFactory;
+import org.apache.hadoop.ozone.container.common.ContainerTestUtils;
+import org.apache.hadoop.ozone.container.common.volume.DbVolume;
+import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
+import org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.UUID;
+
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.powermock.api.mockito.PowerMockito.when;
+
+/**
+ * Test for {@link StorageVolumeUtil}.
+ */
+public class TestStorageVolumeUtil {
+  @Rule
+  public final TemporaryFolder folder = new TemporaryFolder();
+
+  private static final String DATANODE_UUID = UUID.randomUUID().toString();
+  private static final String CLUSTER_ID = UUID.randomUUID().toString();
+  private static final OzoneConfiguration CONF = new OzoneConfiguration();
+
+  private HddsVolume.Builder hddsVolumeBuilder;
+  private DbVolume.Builder dbVolumeBuilder;
+
+  @Before
+  public void setup() throws Exception {
+    hddsVolumeBuilder = new HddsVolume.Builder(folder.newFolder().getPath())
+        .datanodeUuid(DATANODE_UUID)
+        .conf(CONF)
+        .usageCheckFactory(MockSpaceUsageCheckFactory.NONE);
+    dbVolumeBuilder = new DbVolume.Builder(folder.newFolder().getPath())
+        .datanodeUuid(DATANODE_UUID)
+        .conf(CONF)
+        .usageCheckFactory(MockSpaceUsageCheckFactory.NONE);
+  }
+
+  @Test
+  public void testCheckVolumeNoDupDbStoreCreated() throws IOException {
+    ContainerTestUtils.enableSchemaV3(CONF);
+
+    HddsVolume hddsVolume = hddsVolumeBuilder.build();
+    HddsVolume spyHddsVolume = spy(hddsVolume);
+    DbVolume dbVolume = dbVolumeBuilder.build();
+    MutableVolumeSet dbVolumeSet = mock(MutableVolumeSet.class);
+    when(dbVolumeSet.getVolumesList())
+        .thenReturn(Collections.singletonList(dbVolume));
+
+    // check the dbVolume first for hddsVolume to use
+    boolean res = StorageVolumeUtil.checkVolume(dbVolume, CLUSTER_ID,
+        CLUSTER_ID, CONF, null, null);
+    assertTrue(res);
+
+    // checkVolume for the 1st time: rootFiles.length == 1
+    res = StorageVolumeUtil.checkVolume(spyHddsVolume, CLUSTER_ID,
+        CLUSTER_ID, CONF, null, dbVolumeSet);
+    assertTrue(res);
+    // createDbStore called as expected
+    verify(spyHddsVolume, times(1)).createDbStore(dbVolumeSet);
+
+    // checkVolume for the 2nd time: rootFiles.length == 2
+    res = StorageVolumeUtil.checkVolume(spyHddsVolume, CLUSTER_ID,
+        CLUSTER_ID, CONF, null, dbVolumeSet);
+    assertTrue(res);
+
+    // should only call createDbStore once, so no dup db instance
+    verify(spyHddsVolume, times(1)).createDbStore(dbVolumeSet);
+  }
+}
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/volume/TestDbVolume.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/volume/TestDbVolume.java
new file mode 100644
index 0000000000..b0f0821943
--- /dev/null
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/volume/TestDbVolume.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.ozone.container.common.volume;
+
+import org.apache.hadoop.fs.StorageType;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.fs.MockSpaceUsageCheckFactory;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import org.apache.hadoop.ozone.container.common.ContainerTestUtils;
+import org.apache.hadoop.ozone.container.common.utils.DatanodeStoreCache;
+import org.apache.hadoop.ozone.container.common.utils.StorageVolumeUtil;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.UUID;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.powermock.api.mockito.PowerMockito.when;
+
+/**
+ * Unit tests for {@link DbVolume}.
+ */
+public class TestDbVolume {
+
+  private static final String DATANODE_UUID = UUID.randomUUID().toString();
+  private static final String CLUSTER_ID = UUID.randomUUID().toString();
+  private static final OzoneConfiguration CONF = new OzoneConfiguration();
+
+  private DbVolume.Builder volumeBuilder;
+  private File versionFile;
+
+  @Rule
+  public TemporaryFolder folder = new TemporaryFolder();
+
+  @Before
+  public void setup() throws Exception {
+    File rootDir = new File(folder.getRoot(), DbVolume.DB_VOLUME_DIR);
+    volumeBuilder = new DbVolume.Builder(folder.getRoot().getPath())
+        .datanodeUuid(DATANODE_UUID)
+        .conf(CONF)
+        .usageCheckFactory(MockSpaceUsageCheckFactory.NONE);
+    versionFile = StorageVolumeUtil.getVersionFile(rootDir);
+  }
+
+  @Test
+  public void testInitializeEmptyDbVolume() throws IOException {
+    DbVolume volume = volumeBuilder.build();
+
+    // The initial state of HddsVolume should be "NOT_FORMATTED" when
+    // clusterID is not specified and the version file should not be written
+    // to disk.
+    assertNull(volume.getClusterID());
+    assertEquals(StorageType.DEFAULT, volume.getStorageType());
+    assertEquals(HddsVolume.VolumeState.NOT_FORMATTED,
+        volume.getStorageState());
+    assertFalse("Version file should not be created when clusterID is not " +
+        "known.", versionFile.exists());
+
+    // Format the volume with clusterID.
+    volume.format(CLUSTER_ID);
+
+    // The state of HddsVolume after formatting with clusterID should be
+    // NORMAL and the version file should exist.
+    assertTrue("Volume format should create Version file",
+        versionFile.exists());
+    assertEquals(CLUSTER_ID, volume.getClusterID());
+    assertEquals(HddsVolume.VolumeState.NORMAL, volume.getStorageState());
+    assertEquals(0, volume.getHddsVolumeIDs().size());
+  }
+
+  @Test
+  public void testInitializeNonEmptyDbVolume() throws IOException {
+    DbVolume volume = volumeBuilder.build();
+
+    // The initial state of HddsVolume should be "NOT_FORMATTED" when
+    // clusterID is not specified and the version file should not be written
+    // to disk.
+    assertNull(volume.getClusterID());
+    assertEquals(StorageType.DEFAULT, volume.getStorageType());
+    assertEquals(HddsVolume.VolumeState.NOT_FORMATTED,
+        volume.getStorageState());
+    assertFalse("Version file should not be created when clusterID is not " +
+        "known.", versionFile.exists());
+
+    // Format the volume with clusterID.
+    volume.format(CLUSTER_ID);
+    volume.createWorkingDir(CLUSTER_ID, null);
+
+    // The clusterIdDir should be created
+    File clusterIdDir = new File(volume.getStorageDir(), CLUSTER_ID);
+    assertTrue(clusterIdDir.exists());
+
+    // Create some subdirectories to mock db instances under this volume.
+    int numSubDirs = 5;
+    File[] subdirs = new File[numSubDirs];
+    for (int i = 0; i < numSubDirs; i++) {
+      subdirs[i] = new File(clusterIdDir, UUID.randomUUID().toString());
+      boolean res = subdirs[i].mkdir();
+      assertTrue(res);
+    }
+
+    // Rebuild the same volume to simulate DN restart.
+    volume = volumeBuilder.build();
+    assertEquals(numSubDirs, volume.getHddsVolumeIDs().size());
+  }
+
+  @Test
+  public void testDbStoreClosedOnBadDbVolume() throws IOException {
+    ContainerTestUtils.enableSchemaV3(CONF);
+
+    DbVolume dbVolume = volumeBuilder.build();
+    dbVolume.format(CLUSTER_ID);
+    dbVolume.createWorkingDir(CLUSTER_ID, null);
+
+    MutableVolumeSet dbVolumeSet = mock(MutableVolumeSet.class);
+    when(dbVolumeSet.getVolumesList())
+        .thenReturn(Collections.singletonList(dbVolume));
+
+    MutableVolumeSet hddsVolumeSet = createHddsVolumeSet(3);
+    for (HddsVolume hddsVolume : StorageVolumeUtil.getHddsVolumesList(
+        hddsVolumeSet.getVolumesList())) {
+      hddsVolume.format(CLUSTER_ID);
+      hddsVolume.createWorkingDir(CLUSTER_ID, dbVolumeSet);
+    }
+
+    // The db handlers should be in the cache
+    assertEquals(3, DatanodeStoreCache.getInstance().size());
+
+    // Make the dbVolume a bad volume
+    dbVolume.failVolume();
+
+    // The db handlers should be removed from the cache
+    assertEquals(0, DatanodeStoreCache.getInstance().size());
+  }
+
+  private MutableVolumeSet createHddsVolumeSet(int volumeNum)
+      throws IOException {
+    File[] hddsVolumeDirs = new File[volumeNum];
+    StringBuilder hddsDirs = new StringBuilder();
+    for (int i = 0; i < volumeNum; i++) {
+      hddsVolumeDirs[i] = folder.newFolder();
+      hddsDirs.append(hddsVolumeDirs[i]).append(",");
+    }
+    CONF.set(ScmConfigKeys.HDDS_DATANODE_DIR_KEY, hddsDirs.toString());
+    MutableVolumeSet hddsVolumeSet = new MutableVolumeSet(DATANODE_UUID,
+        CLUSTER_ID, CONF, null, StorageVolume.VolumeType.DATA_VOLUME, null);
+    return hddsVolumeSet;
+  }
+}
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/volume/TestHddsVolume.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/volume/TestHddsVolume.java
index 3f664b48c8..9f26a0b061 100644
--- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/volume/TestHddsVolume.java
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/volume/TestHddsVolume.java
@@ -20,7 +20,6 @@ package org.apache.hadoop.ozone.container.common.volume;
 import java.io.File;
 import java.io.IOException;
 import java.time.Duration;
-import java.util.Properties;
 import java.util.UUID;
 import java.util.concurrent.atomic.AtomicLong;
 
@@ -32,15 +31,20 @@ import org.apache.hadoop.hdds.fs.SpaceUsageCheckFactory;
 import org.apache.hadoop.hdds.fs.SpaceUsagePersistence;
 import org.apache.hadoop.hdds.fs.SpaceUsageSource;
 import org.apache.hadoop.hdds.scm.ScmConfigKeys;
-import org.apache.hadoop.ozone.container.common.helpers.DatanodeVersionFile;
-import org.apache.hadoop.ozone.container.common.utils.HddsVolumeUtil;
+import org.apache.hadoop.ozone.OzoneConfigKeys;
 
 import static org.apache.hadoop.hdds.fs.MockSpaceUsagePersistence.inMemory;
 import static org.apache.hadoop.hdds.fs.MockSpaceUsageSource.fixed;
+import static org.apache.hadoop.ozone.OzoneConsts.CONTAINER_DB_NAME;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
+
+import org.apache.hadoop.ozone.container.common.ContainerTestUtils;
+import org.apache.hadoop.ozone.container.common.utils.DatanodeStoreCache;
+import org.apache.hadoop.ozone.container.common.utils.StorageVolumeUtil;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
@@ -71,7 +75,7 @@ public class TestHddsVolume {
         .datanodeUuid(DATANODE_UUID)
         .conf(CONF)
         .usageCheckFactory(MockSpaceUsageCheckFactory.NONE);
-    versionFile = HddsVolumeUtil.getVersionFile(rootDir);
+    versionFile = StorageVolumeUtil.getVersionFile(rootDir);
   }
 
   @Test
@@ -100,31 +104,6 @@ public class TestHddsVolume {
     assertEquals(HddsVolume.VolumeState.NORMAL, volume.getStorageState());
   }
 
-  @Test
-  public void testReadPropertiesFromVersionFile() throws Exception {
-    HddsVolume volume = volumeBuilder.build();
-
-    volume.format(CLUSTER_ID);
-
-    Properties properties = DatanodeVersionFile.readFrom(versionFile);
-
-    String storageID = HddsVolumeUtil.getStorageID(properties, versionFile);
-    String clusterID = HddsVolumeUtil.getClusterID(
-        properties, versionFile, CLUSTER_ID);
-    String datanodeUuid = HddsVolumeUtil.getDatanodeUUID(
-        properties, versionFile, DATANODE_UUID);
-    long cTime = HddsVolumeUtil.getCreationTime(
-        properties, versionFile);
-    int layoutVersion = HddsVolumeUtil.getLayOutVersion(
-        properties, versionFile);
-
-    assertEquals(volume.getStorageID(), storageID);
-    assertEquals(volume.getClusterID(), clusterID);
-    assertEquals(volume.getDatanodeUuid(), datanodeUuid);
-    assertEquals(volume.getCTime(), cTime);
-    assertEquals(volume.getLayoutVersion(), layoutVersion);
-  }
-
   @Test
   public void testShutdown() throws Exception {
     long initialUsedSpace = 250;
@@ -276,4 +255,131 @@ public class TestHddsVolume {
     // Shutdown the volume.
     volume.shutdown();
   }
+
+  @Test
+  public void testDbStoreCreatedWithoutDbVolumes() throws IOException {
+    ContainerTestUtils.enableSchemaV3(CONF);
+
+    HddsVolume volume = volumeBuilder.build();
+    volume.format(CLUSTER_ID);
+    volume.createWorkingDir(CLUSTER_ID, null);
+
+    // No DbVolume chosen and use the HddsVolume itself to hold
+    // a db instance.
+    assertNull(volume.getDbVolume());
+    File storageIdDir = new File(new File(volume.getStorageDir(),
+        CLUSTER_ID), volume.getStorageID());
+    assertEquals(volume.getDbParentDir(), storageIdDir);
+
+    // The db directory should exist
+    File containerDBFile = new File(volume.getDbParentDir(),
+        CONTAINER_DB_NAME);
+    assertTrue(containerDBFile.exists());
+
+    volume.shutdown();
+  }
+
+  @Test
+  public void testDbStoreCreatedWithDbVolumes() throws IOException {
+    ContainerTestUtils.enableSchemaV3(CONF);
+
+    // create the DbVolumeSet
+    MutableVolumeSet dbVolumeSet = createDbVolumeSet();
+
+    HddsVolume volume = volumeBuilder.build();
+    volume.format(CLUSTER_ID);
+    volume.createWorkingDir(CLUSTER_ID, dbVolumeSet);
+
+    // DbVolume chosen.
+    assertNotNull(volume.getDbVolume());
+
+    File storageIdDir = new File(new File(volume.getDbVolume()
+        .getStorageDir(), CLUSTER_ID), volume.getStorageID());
+    // Db parent dir should be set to a subdir under the dbVolume.
+    assertEquals(volume.getDbParentDir(), storageIdDir);
+
+    // The db directory should exist
+    File containerDBFile = new File(volume.getDbParentDir(),
+        CONTAINER_DB_NAME);
+    assertTrue(containerDBFile.exists());
+
+    volume.shutdown();
+  }
+
+  @Test
+  public void testDbStoreClosedOnBadVolumeWithoutDbVolumes()
+      throws IOException {
+    ContainerTestUtils.enableSchemaV3(CONF);
+
+    HddsVolume volume = volumeBuilder.build();
+    volume.format(CLUSTER_ID);
+    volume.createWorkingDir(CLUSTER_ID, null);
+
+    // No DbVolume chosen and use the HddsVolume itself to hold
+    // a db instance.
+    assertNull(volume.getDbVolume());
+    File storageIdDir = new File(new File(volume.getStorageDir(),
+        CLUSTER_ID), volume.getStorageID());
+    assertEquals(volume.getDbParentDir(), storageIdDir);
+
+    // The db directory should exist
+    File containerDBFile = new File(volume.getDbParentDir(),
+        CONTAINER_DB_NAME);
+    assertTrue(containerDBFile.exists());
+    assertNotNull(DatanodeStoreCache.getInstance().getDB(
+        containerDBFile.getAbsolutePath()));
+
+    // Make it a bad volume
+    volume.failVolume();
+
+    // The db should be removed from cache
+    assertNull(DatanodeStoreCache.getInstance().getDB(
+        containerDBFile.getAbsolutePath()));
+  }
+
+  @Test
+  public void testDbStoreClosedOnBadVolumeWithDbVolumes() throws IOException {
+    ContainerTestUtils.enableSchemaV3(CONF);
+
+    // create the DbVolumeSet
+    MutableVolumeSet dbVolumeSet = createDbVolumeSet();
+
+    HddsVolume volume = volumeBuilder.build();
+    volume.format(CLUSTER_ID);
+    volume.createWorkingDir(CLUSTER_ID, dbVolumeSet);
+
+    // DbVolume chosen.
+    assertNotNull(volume.getDbVolume());
+
+    File storageIdDir = new File(new File(volume.getDbVolume()
+        .getStorageDir(), CLUSTER_ID), volume.getStorageID());
+    // Db parent dir should be set to a subdir under the dbVolume.
+    assertEquals(volume.getDbParentDir(), storageIdDir);
+
+    // The db directory should exist
+    File containerDBFile = new File(volume.getDbParentDir(),
+        CONTAINER_DB_NAME);
+    assertTrue(containerDBFile.exists());
+    assertNotNull(DatanodeStoreCache.getInstance().getDB(
+        containerDBFile.getAbsolutePath()));
+
+    // Make it a bad volume
+    volume.failVolume();
+
+    // The db should be removed from cache
+    assertNull(DatanodeStoreCache.getInstance().getDB(
+        containerDBFile.getAbsolutePath()));
+  }
+
+  private MutableVolumeSet createDbVolumeSet() throws IOException {
+    File dbVolumeDir = folder.newFolder();
+    CONF.set(OzoneConfigKeys.HDDS_DATANODE_CONTAINER_DB_DIR,
+        dbVolumeDir.getAbsolutePath());
+    MutableVolumeSet dbVolumeSet = new MutableVolumeSet(DATANODE_UUID,
+        CLUSTER_ID, CONF, null, StorageVolume.VolumeType.DB_VOLUME,
+        null);
+    dbVolumeSet.getVolumesList().get(0).format(CLUSTER_ID);
+    dbVolumeSet.getVolumesList().get(0).createWorkingDir(CLUSTER_ID, null);
+    return dbVolumeSet;
+  }
 }
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/volume/TestStorageVolume.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/volume/TestStorageVolume.java
new file mode 100644
index 0000000000..5f015204fa
--- /dev/null
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/volume/TestStorageVolume.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.ozone.container.common.volume;
+
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.fs.MockSpaceUsageCheckFactory;
+import org.apache.hadoop.ozone.container.common.helpers.DatanodeVersionFile;
+import org.apache.hadoop.ozone.container.common.utils.StorageVolumeUtil;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.util.Properties;
+import java.util.UUID;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Test for StorageVolume.
+ */
+public class TestStorageVolume {
+
+  private static final String DATANODE_UUID = UUID.randomUUID().toString();
+  private static final String CLUSTER_ID = UUID.randomUUID().toString();
+  private static final OzoneConfiguration CONF = new OzoneConfiguration();
+
+  @Rule
+  public TemporaryFolder folder = new TemporaryFolder();
+
+  private HddsVolume.Builder volumeBuilder;
+  private File versionFile;
+
+  @Before
+  public void setup() throws Exception {
+    File rootDir = new File(folder.getRoot(), HddsVolume.HDDS_VOLUME_DIR);
+    volumeBuilder = new HddsVolume.Builder(folder.getRoot().getPath())
+        .datanodeUuid(DATANODE_UUID)
+        .conf(CONF)
+        .usageCheckFactory(MockSpaceUsageCheckFactory.NONE);
+    versionFile = StorageVolumeUtil.getVersionFile(rootDir);
+  }
+
+  @Test
+  public void testReadPropertiesFromVersionFile() throws Exception {
+    HddsVolume volume = volumeBuilder.build();
+
+    volume.format(CLUSTER_ID);
+
+    Properties properties = DatanodeVersionFile.readFrom(versionFile);
+
+    String storageID = StorageVolumeUtil.getStorageID(properties, versionFile);
+    String clusterID = StorageVolumeUtil.getClusterID(
+        properties, versionFile, CLUSTER_ID);
+    String datanodeUuid = StorageVolumeUtil.getDatanodeUUID(
+        properties, versionFile, DATANODE_UUID);
+    long cTime = StorageVolumeUtil.getCreationTime(
+        properties, versionFile);
+    int layoutVersion = StorageVolumeUtil.getLayOutVersion(
+        properties, versionFile);
+
+    assertEquals(volume.getStorageID(), storageID);
+    assertEquals(volume.getClusterID(), clusterID);
+    assertEquals(volume.getDatanodeUuid(), datanodeUuid);
+    assertEquals(volume.getCTime(), cTime);
+    assertEquals(volume.getLayoutVersion(), layoutVersion);
+  }
+}
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/volume/TestVolumeSetDiskChecks.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/volume/TestVolumeSetDiskChecks.java
index 76e771ddcd..84263de93c 100644
--- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/volume/TestVolumeSetDiskChecks.java
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/volume/TestVolumeSetDiskChecks.java
@@ -30,6 +30,7 @@ import org.apache.hadoop.hdds.DFSConfigKeysLegacy;
 import org.apache.hadoop.hdds.conf.ConfigurationSource;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.ozone.OzoneConfigKeys;
+import org.apache.hadoop.ozone.container.common.ContainerTestUtils;
 import org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration;
 import org.apache.ozone.test.GenericTestUtils;
 import org.apache.hadoop.util.DiskChecker.DiskErrorException;
@@ -117,6 +118,7 @@ public class TestVolumeSetDiskChecks {
     final int numBadVolumes = 2;
 
     conf = getConfWithDataNodeDirs(numVolumes);
+    ContainerTestUtils.enableSchemaV3(conf);
     StorageVolumeChecker dummyChecker =
         new DummyChecker(conf, new Timer(), numBadVolumes);
     final MutableVolumeSet volumeSet = new MutableVolumeSet(
@@ -127,6 +129,10 @@ public class TestVolumeSetDiskChecks {
         UUID.randomUUID().toString(), conf, null,
         StorageVolume.VolumeType.META_VOLUME,
         dummyChecker);
+    final MutableVolumeSet dbVolumeSet = new MutableVolumeSet(
+        UUID.randomUUID().toString(), conf, null,
+        StorageVolume.VolumeType.DB_VOLUME,
+        dummyChecker);
 
     Assert.assertEquals(volumeSet.getFailedVolumesList().size(),
         numBadVolumes);
@@ -136,8 +142,14 @@ public class TestVolumeSetDiskChecks {
         numBadVolumes);
     Assert.assertEquals(metaVolumeSet.getVolumesList().size(),
         numVolumes - numBadVolumes);
+    Assert.assertEquals(dbVolumeSet.getFailedVolumesList().size(),
+        numBadVolumes);
+    Assert.assertEquals(dbVolumeSet.getVolumesList().size(),
+        numVolumes - numBadVolumes);
+
     volumeSet.shutdown();
     metaVolumeSet.shutdown();
+    dbVolumeSet.shutdown();
   }
 
   /**
@@ -148,6 +160,7 @@ public class TestVolumeSetDiskChecks {
     final int numVolumes = 5;
 
     conf = getConfWithDataNodeDirs(numVolumes);
+    ContainerTestUtils.enableSchemaV3(conf);
     StorageVolumeChecker dummyChecker =
         new DummyChecker(conf, new Timer(), numVolumes);
 
@@ -159,13 +172,21 @@ public class TestVolumeSetDiskChecks {
         UUID.randomUUID().toString(), conf, null,
         StorageVolume.VolumeType.META_VOLUME,
         dummyChecker);
+    final MutableVolumeSet dbVolumeSet = new MutableVolumeSet(
+        UUID.randomUUID().toString(), conf, null,
+        StorageVolume.VolumeType.DB_VOLUME,
+        dummyChecker);
 
     assertEquals(volumeSet.getFailedVolumesList().size(), numVolumes);
     assertEquals(volumeSet.getVolumesList().size(), 0);
     assertEquals(metaVolumeSet.getFailedVolumesList().size(), numVolumes);
     assertEquals(metaVolumeSet.getVolumesList().size(), 0);
+    assertEquals(dbVolumeSet.getFailedVolumesList().size(), numVolumes);
+    assertEquals(dbVolumeSet.getVolumesList().size(), 0);
+
     volumeSet.shutdown();
     metaVolumeSet.shutdown();
+    dbVolumeSet.shutdown();
   }
 
   /**
@@ -188,10 +209,19 @@ public class TestVolumeSetDiskChecks {
     }
     ozoneConf.set(OzoneConfigKeys.DFS_CONTAINER_RATIS_DATANODE_STORAGE_DIR,
         String.join(",", metaDirs));
+
+    final List<String> dbDirs = new ArrayList<>();
+    for (int i = 0; i < numDirs; ++i) {
+      dbDirs.add(GenericTestUtils.getRandomizedTestDir().getPath());
+    }
+    ozoneConf.set(OzoneConfigKeys.HDDS_DATANODE_CONTAINER_DB_DIR,
+        String.join(",", dbDirs));
+
     DatanodeConfiguration dnConf =
         ozoneConf.getObject(DatanodeConfiguration.class);
     dnConf.setFailedDataVolumesTolerated(numDirs * 2);
     dnConf.setFailedMetadataVolumesTolerated(numDirs * 2);
+    dnConf.setFailedDbVolumesTolerated(numDirs * 2);
     ozoneConf.setFromObject(dnConf);
     return ozoneConf;
   }
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainer.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainer.java
index 35fef4a12d..92ebe0a7f2 100644
--- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainer.java
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainer.java
@@ -30,6 +30,7 @@ import org.apache.hadoop.hdds.scm.ScmConfigKeys;
 import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
 import org.apache.hadoop.ozone.OzoneConfigKeys;
 import org.apache.hadoop.hdds.utils.db.Table;
+import org.apache.hadoop.ozone.container.common.ContainerTestUtils;
 import org.apache.hadoop.ozone.container.common.helpers.BlockData;
 import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo;
 import org.apache.hadoop.ozone.container.common.impl.ContainerLayoutVersion;
@@ -59,6 +60,7 @@ import org.mockito.Mockito;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.File;
 import java.util.Random;
 import java.util.UUID;
 import java.util.HashMap;
@@ -184,6 +186,17 @@ public class TestOzoneContainer {
     conf.set(OzoneConfigKeys.DFS_CONTAINER_RATIS_DATANODE_STORAGE_DIR,
             String.join(",",
             path + "/ratis1", path + "/ratis2", path + "ratis3"));
+
+    File[] dbPaths = new File[3];
+    StringBuilder dbDirString = new StringBuilder();
+    for (int i = 0; i < 3; i++) {
+      dbPaths[i] = folder.newFolder();
+      dbDirString.append(dbPaths[i]).append(",");
+    }
+    conf.set(OzoneConfigKeys.HDDS_DATANODE_CONTAINER_DB_DIR,
+        dbDirString.toString());
+    ContainerTestUtils.enableSchemaV3(conf);
+
     DatanodeStateMachine stateMachine = Mockito.mock(
             DatanodeStateMachine.class);
     StateContext context = Mockito.mock(StateContext.class);
@@ -199,7 +212,8 @@ public class TestOzoneContainer {
     Assert.assertEquals(3,
             ozoneContainer.getNodeReport().getMetadataStorageReportList()
                     .size());
-
+    Assert.assertEquals(3,
+            ozoneContainer.getNodeReport().getDbStorageReportList().size());
   }
 
   @Test
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/upgrade/TestDatanodeUpgradeToScmHA.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/upgrade/TestDatanodeUpgradeToScmHA.java
index d882ca4ed4..0519007cf0 100644
--- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/upgrade/TestDatanodeUpgradeToScmHA.java
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/upgrade/TestDatanodeUpgradeToScmHA.java
@@ -192,7 +192,7 @@ public class TestDatanodeUpgradeToScmHA {
     // restarted with SCM HA config and gets a different SCM ID.
     conf.setBoolean(ScmConfigKeys.OZONE_SCM_HA_ENABLE_KEY, true);
     changeScmID();
-    restartDatanode(HDDSLayoutFeature.INITIAL_VERSION.layoutVersion());
+    restartDatanode(HDDSLayoutFeature.INITIAL_VERSION.layoutVersion(), false);
     // Make sure the existing container can be read.
     readChunk(exportWriteChunk2, pipeline);
 
@@ -289,7 +289,7 @@ public class TestDatanodeUpgradeToScmHA {
 
     /// FINALIZED: Restart datanode to upgrade the failed volume ///
 
-    restartDatanode(HDDSLayoutFeature.SCM_HA.layoutVersion());
+    restartDatanode(HDDSLayoutFeature.SCM_HA.layoutVersion(), true);
 
     Assert.assertEquals(1,
         dsm.getContainer().getVolumeSet().getVolumesList().size());
@@ -344,7 +344,7 @@ public class TestDatanodeUpgradeToScmHA {
     changeScmID();
     // A new volume is added that must be formatted.
     File preFinVolume2 = addVolume();
-    restartDatanode(HDDSLayoutFeature.INITIAL_VERSION.layoutVersion());
+    restartDatanode(HDDSLayoutFeature.INITIAL_VERSION.layoutVersion(), false);
 
     Assert.assertEquals(2,
         dsm.getContainer().getVolumeSet().getVolumesList().size());
@@ -378,7 +378,7 @@ public class TestDatanodeUpgradeToScmHA {
     File finVolume = addVolume();
     // Yet another SCM ID is received this time, but it should not matter.
     changeScmID();
-    restartDatanode(HDDSLayoutFeature.SCM_HA.layoutVersion());
+    restartDatanode(HDDSLayoutFeature.SCM_HA.layoutVersion(), true);
     Assert.assertEquals(3,
         dsm.getContainer().getVolumeSet().getVolumesList().size());
     Assert.assertEquals(0,
@@ -521,7 +521,7 @@ public class TestDatanodeUpgradeToScmHA {
     callVersionEndpointTask();
   }
 
-  public void restartDatanode(int expectedMlv)
+  public void restartDatanode(int expectedMlv, boolean afterFinalize)
       throws Exception {
     // Stop existing datanode.
     DatanodeDetails dd = dsm.getDatanodeDetails();
@@ -531,10 +531,29 @@ public class TestDatanodeUpgradeToScmHA {
     dsm = new DatanodeStateMachine(dd,
         conf, null, null,
         null);
+
+    if (afterFinalize) {
+      // After FINALIZE, the mlv should be >= SCM_HA.
+      // NOTE: Before we have a newer LayoutFeature that SCM_HA,
+      // we could check the mlv exactly == SCM_HA,
+      // but after, we could only check mlv >= SCM_HA.
+      checkMlvAtLeast(expectedMlv);
+    } else {
+      // Before FINALIZE, mlv should stay at a version like INITIAL_VERSION.
+      checkMlvExact(expectedMlv);
+    }
+
+    callVersionEndpointTask();
+  }
+
+  private void checkMlvExact(int expectedMlv) {
     int mlv = dsm.getLayoutVersionManager().getMetadataLayoutVersion();
     Assert.assertEquals(expectedMlv, mlv);
+  }
 
-    callVersionEndpointTask();
+  private void checkMlvAtLeast(int expectedMlv) {
+    int mlv = dsm.getLayoutVersionManager().getMetadataLayoutVersion();
+    Assert.assertTrue(expectedMlv <= mlv);
   }
 
   /**
diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/HddsServerUtil.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/HddsServerUtil.java
index 98821d9615..394c4e30e8 100644
--- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/HddsServerUtil.java
+++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/HddsServerUtil.java
@@ -90,6 +90,7 @@ import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTER
 import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_INFO_WAIT_DURATION;
 import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_INFO_WAIT_DURATION_DEFAULT;
 import static org.apache.hadoop.hdds.server.ServerUtils.sanitizeUserArgs;
+import static org.apache.hadoop.ozone.OzoneConfigKeys.HDDS_DATANODE_CONTAINER_DB_DIR;
 
 import org.rocksdb.RocksDBException;
 import org.slf4j.Logger;
@@ -390,6 +391,12 @@ public final class HddsServerUtil {
     return rawLocations;
   }
 
+  public static Collection<String> getDatanodeDbDirs(
+      ConfigurationSource conf) {
+    // No fallback here, since this config is optional.
+    return conf.getTrimmedStringCollection(HDDS_DATANODE_CONTAINER_DB_DIR);
+  }
+
   /**
    * Get the path for datanode id file.
    *
diff --git a/hadoop-hdds/interface-server/src/main/proto/ScmServerDatanodeHeartbeatProtocol.proto b/hadoop-hdds/interface-server/src/main/proto/ScmServerDatanodeHeartbeatProtocol.proto
index ce4cdd0750..35d878489e 100644
--- a/hadoop-hdds/interface-server/src/main/proto/ScmServerDatanodeHeartbeatProtocol.proto
+++ b/hadoop-hdds/interface-server/src/main/proto/ScmServerDatanodeHeartbeatProtocol.proto
@@ -159,6 +159,7 @@ message SCMNodeAddressList {
 message NodeReportProto {
   repeated StorageReportProto storageReport = 1;
   repeated MetadataStorageReportProto metadataStorageReport = 2;
+  repeated StorageReportProto dbStorageReport = 3;
 }
 
 message StorageReportProto {
diff --git a/hadoop-ozone/dist/src/main/compose/upgrade/upgrades/non-rolling-upgrade/1.1.0-1.2.0/callback.sh b/hadoop-ozone/dist/src/main/compose/upgrade/upgrades/non-rolling-upgrade/1.1.0-1.2.0/callback.sh
index b533e6c03d..469cc4c49a 100755
--- a/hadoop-ozone/dist/src/main/compose/upgrade/upgrades/non-rolling-upgrade/1.1.0-1.2.0/callback.sh
+++ b/hadoop-ozone/dist/src/main/compose/upgrade/upgrades/non-rolling-upgrade/1.1.0-1.2.0/callback.sh
@@ -63,7 +63,7 @@ with_old_version_downgraded() {
 }
 
 with_new_version_finalized() {
-  _check_hdds_mlvs 2
+  _check_hdds_mlvs 3
   # OM currently only has one layout version.
   _check_om_mlvs 0
 
diff --git a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/debug/container/ContainerCommands.java b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/debug/container/ContainerCommands.java
index 1153c4d5c7..c5b21d604f 100644
--- a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/debug/container/ContainerCommands.java
+++ b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/debug/container/ContainerCommands.java
@@ -35,7 +35,6 @@ import org.apache.hadoop.ozone.container.common.helpers.DatanodeVersionFile;
 import org.apache.hadoop.ozone.container.common.impl.ContainerData;
 import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
 import org.apache.hadoop.ozone.container.common.interfaces.Handler;
-import org.apache.hadoop.ozone.container.common.utils.HddsVolumeUtil;
 import org.apache.hadoop.ozone.container.common.utils.StorageVolumeUtil;
 import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
 import org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet;
@@ -191,7 +190,7 @@ public class ContainerCommands implements Callable<Void>, SubcommandWithParent {
           "Version file " + versionFile + " is missing");
     }
 
-    return HddsVolumeUtil
+    return StorageVolumeUtil
         .getProperty(props, OzoneConsts.DATANODE_UUID, versionFile);
   }
 
diff --git a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/containergenerator/GeneratorDatanode.java b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/containergenerator/GeneratorDatanode.java
index 02fa7e6373..1d21e9bb71 100644
--- a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/containergenerator/GeneratorDatanode.java
+++ b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/containergenerator/GeneratorDatanode.java
@@ -52,7 +52,7 @@ import org.apache.hadoop.ozone.container.common.helpers.DatanodeVersionFile;
 import org.apache.hadoop.ozone.container.common.impl.ContainerLayoutVersion;
 import org.apache.hadoop.ozone.container.common.transport.server.ratis.DispatcherContext;
 import org.apache.hadoop.ozone.container.common.transport.server.ratis.DispatcherContext.WriteChunkStage;
-import org.apache.hadoop.ozone.container.common.utils.HddsVolumeUtil;
+import org.apache.hadoop.ozone.container.common.utils.StorageVolumeUtil;
 import org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet;
 import org.apache.hadoop.ozone.container.common.volume.RoundRobinVolumeChoosingPolicy;
 import org.apache.hadoop.ozone.container.common.volume.StorageVolume;
@@ -160,9 +160,9 @@ public class GeneratorDatanode extends BaseGenerator {
           "Version file " + versionFile + " is missing");
     }
 
-    String clusterId =
-        HddsVolumeUtil.getProperty(props, OzoneConsts.CLUSTER_ID, versionFile);
-    datanodeId = HddsVolumeUtil
+    String clusterId = StorageVolumeUtil.getProperty(props,
+        OzoneConsts.CLUSTER_ID, versionFile);
+    datanodeId = StorageVolumeUtil
         .getProperty(props, OzoneConsts.DATANODE_UUID, versionFile);
 
     volumeSet = new MutableVolumeSet(datanodeId, clusterId, config, null,


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org