You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by sa...@apache.org on 2022/05/05 09:55:57 UTC

[ozone] branch HDDS-3630 updated: HDDS-6542. [Merge rocksdb in datanode] KeyValueContainer operation adaptation for schema v3 containers. (#3346)

This is an automated email from the ASF dual-hosted git repository.

sammichen pushed a commit to branch HDDS-3630
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/HDDS-3630 by this push:
     new de18c59022 HDDS-6542. [Merge rocksdb in datanode] KeyValueContainer operation adaptation for schema v3 containers. (#3346)
de18c59022 is described below

commit de18c59022c38b8c990e4d8e3360d9b9a6fe24d6
Author: Gui Hecheng <ma...@tencent.com>
AuthorDate: Thu May 5 17:55:52 2022 +0800

    HDDS-6542. [Merge rocksdb in datanode] KeyValueContainer operation adaptation for schema v3 containers. (#3346)
---
 .../java/org/apache/hadoop/ozone/OzoneConsts.java  |   3 +-
 .../container/keyvalue/KeyValueContainer.java      |  86 ++++++-----
 .../container/keyvalue/KeyValueContainerCheck.java |   9 +-
 .../container/keyvalue/KeyValueContainerData.java  |  16 +-
 .../KeyValueContainerMetadataInspector.java        |  47 ++++--
 .../ozone/container/keyvalue/KeyValueHandler.java  |   9 +-
 .../container/keyvalue/helpers/BlockUtils.java     |  32 ++--
 .../helpers/KeyValueContainerLocationUtil.java     |  12 +-
 .../keyvalue/helpers/KeyValueContainerUtil.java    | 168 ++++++++++++---------
 .../metadata/DatanodeStoreSchemaThreeImpl.java     |  12 ++
 .../ozone/container/metadata/DatanodeTable.java    |   6 +
 .../ozone/container/common/ContainerTestUtils.java |  19 +++
 .../container/common/TestBlockDeletingService.java |  51 +++----
 .../common/TestKeyValueContainerData.java          |  20 ++-
 .../TestSchemaOneBackwardsCompatibility.java       |  18 ++-
 .../common/impl/TestContainerPersistence.java      |  26 +++-
 .../keyvalue/ContainerTestVersionInfo.java         |  76 ++++++++++
 .../keyvalue/TestKeyValueBlockIterator.java        |  34 +++--
 .../container/keyvalue/TestKeyValueContainer.java  |  34 +++--
 .../keyvalue/TestKeyValueContainerCheck.java       |  12 +-
 .../TestKeyValueContainerIntegrityChecks.java      |  35 +++--
 .../TestKeyValueContainerMetadataInspector.java    |   6 +-
 .../container/keyvalue/TestTarContainerPacker.java |  27 +++-
 .../keyvalue/impl/TestBlockManagerImpl.java        |  23 ++-
 .../container/ozoneimpl/TestContainerReader.java   |  61 ++++++--
 .../container/ozoneimpl/TestOzoneContainer.java    |  25 +--
 .../org/apache/hadoop/hdds/utils/db/RDBTable.java  |  10 ++
 .../org/apache/hadoop/hdds/utils/db/Table.java     |  10 ++
 .../apache/hadoop/hdds/utils/db/TypedTable.java    |   6 +
 29 files changed, 615 insertions(+), 278 deletions(-)

diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
index 33ee7a5398..71c802c3a9 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
@@ -277,9 +277,8 @@ public final class OzoneConsts {
   // but have containerID as key prefixes.
   public static final String SCHEMA_V3 = "3";
 
-  // TODO(markgui): Add SCHEMA_V3 until it is fully supported.
   public static final String[] SCHEMA_VERSIONS =
-      new String[] {SCHEMA_V1, SCHEMA_V2};
+      new String[] {SCHEMA_V1, SCHEMA_V2, SCHEMA_V3};
 
   // Supported store types.
   public static final String OZONE = "ozone";
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java
index 1060ae8a20..81cadf3178 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java
@@ -137,10 +137,18 @@ public class KeyValueContainer implements Container<KeyValueContainerData> {
           StorageVolumeUtil.getHddsVolumesList(volumeSet.getVolumesList()),
           maxSize);
       String hddsVolumeDir = containerVolume.getHddsRootDir().toString();
+      // Set volume before getContainerDBFile(), because we may need the
+      // volume to deduce the db file.
+      containerData.setVolume(containerVolume);
 
       long containerID = containerData.getContainerID();
       String idDir = VersionedDatanodeFeatures.ScmHA.chooseContainerPathID(
               containerVolume, clusterId);
+      // Set schemaVersion before the dbFile since we have to
+      // choose the dbFile location based on schema version.
+      String schemaVersion = VersionedDatanodeFeatures.SchemaV3
+          .chooseSchemaVersion(config);
+      containerData.setSchemaVersion(schemaVersion);
 
       containerMetaDataPath = KeyValueContainerLocationUtil
           .getContainerMetaDataPath(hddsVolumeDir, idDir, containerID);
@@ -155,8 +163,6 @@ public class KeyValueContainer implements Container<KeyValueContainerData> {
       //Create Metadata path chunks path and metadata db
       File dbFile = getContainerDBFile();
 
-      containerData.setSchemaVersion(
-          VersionedDatanodeFeatures.SchemaV2.chooseSchemaVersion());
       KeyValueContainerUtil.createContainerMetaData(
               containerMetaDataPath, chunksPath, dbFile,
               containerData.getSchemaVersion(), config);
@@ -164,7 +170,6 @@ public class KeyValueContainer implements Container<KeyValueContainerData> {
       //Set containerData for the KeyValueContainer.
       containerData.setChunksPath(chunksPath.getPath());
       containerData.setDbFile(dbFile);
-      containerData.setVolume(containerVolume);
 
       // Create .container file
       File containerFile = getContainerFile();
@@ -202,26 +207,24 @@ public class KeyValueContainer implements Container<KeyValueContainerData> {
    *
    * @param clusterId
    * @param containerVolume
-   * @param hddsVolumeDir
    */
   public void populatePathFields(String clusterId,
-      HddsVolume containerVolume, String hddsVolumeDir) {
+      HddsVolume containerVolume) {
 
     long containerId = containerData.getContainerID();
+    String hddsVolumeDir = containerVolume.getHddsRootDir().getAbsolutePath();
 
     File containerMetaDataPath = KeyValueContainerLocationUtil
         .getContainerMetaDataPath(hddsVolumeDir, clusterId, containerId);
 
     File chunksPath = KeyValueContainerLocationUtil.getChunksLocationPath(
         hddsVolumeDir, clusterId, containerId);
-    File dbFile = KeyValueContainerLocationUtil.getContainerDBFile(
-        containerMetaDataPath, containerId);
 
     //Set containerData for the KeyValueContainer.
     containerData.setMetadataPath(containerMetaDataPath.getPath());
     containerData.setChunksPath(chunksPath.getPath());
-    containerData.setDbFile(dbFile);
     containerData.setVolume(containerVolume);
+    containerData.setDbFile(getContainerDBFile());
   }
 
   /**
@@ -331,36 +334,14 @@ public class KeyValueContainer implements Container<KeyValueContainerData> {
 
   @Override
   public void quasiClose() throws StorageContainerException {
-    // The DB must be synced during close operation
-    flushAndSyncDB();
-
-    writeLock();
-    try {
-      // Second sync should be a very light operation as sync has already
-      // been done outside the lock.
-      flushAndSyncDB();
-      updateContainerData(containerData::quasiCloseContainer);
-      clearPendingPutBlockCache();
-    } finally {
-      writeUnlock();
-    }
+    closeAndFlushIfNeeded(containerData::quasiCloseContainer,
+        !containerData.getSchemaVersion().equals(OzoneConsts.SCHEMA_V3));
   }
 
   @Override
   public void close() throws StorageContainerException {
-    // The DB must be synced during close operation
-    flushAndSyncDB();
-
-    writeLock();
-    try {
-      // Second sync should be a very light operation as sync has already
-      // been done outside the lock.
-      flushAndSyncDB();
-      updateContainerData(containerData::closeContainer);
-      clearPendingPutBlockCache();
-    } finally {
-      writeUnlock();
-    }
+    closeAndFlushIfNeeded(containerData::closeContainer,
+        !containerData.getSchemaVersion().equals(OzoneConsts.SCHEMA_V3));
     LOG.info("Container {} is closed with bcsId {}.",
         containerData.getContainerID(),
         containerData.getBlockCommitSequenceId());
@@ -377,6 +358,36 @@ public class KeyValueContainer implements Container<KeyValueContainerData> {
     }
   }
 
+  /**
+   * For db-per-container schemas, the DB must be synced during
+   * close operation.
+   * For db-per-volume schemas, don't sync the whole db on closing
+   * of a single container.
+   *
+   * @param closer
+   * @param flush
+   * @throws StorageContainerException
+   */
+  private void closeAndFlushIfNeeded(Runnable closer, boolean flush)
+      throws StorageContainerException {
+    if (flush) {
+      flushAndSyncDB();
+    }
+
+    writeLock();
+    try {
+      if (flush) {
+        // Second sync should be a very light operation as sync has already
+        // been done outside the lock.
+        flushAndSyncDB();
+      }
+      updateContainerData(closer);
+      clearPendingPutBlockCache();
+    } finally {
+      writeUnlock();
+    }
+  }
+
   /**
    *
    * Must be invoked with the writeLock held.
@@ -800,8 +811,7 @@ public class KeyValueContainer implements Container<KeyValueContainerData> {
    * @return
    */
   public File getContainerDBFile() {
-    return new File(containerData.getMetadataPath(), containerData
-        .getContainerID() + OzoneConsts.DN_CONTAINER_DB);
+    return KeyValueContainerLocationUtil.getContainerDBFile(containerData);
   }
 
   @Override
@@ -809,7 +819,7 @@ public class KeyValueContainer implements Container<KeyValueContainerData> {
     long containerId = containerData.getContainerID();
     KeyValueContainerCheck checker =
         new KeyValueContainerCheck(containerData.getMetadataPath(), config,
-            containerId);
+            containerId, containerData.getVolume());
     return checker.fastCheck();
   }
 
@@ -830,7 +840,7 @@ public class KeyValueContainer implements Container<KeyValueContainerData> {
     long containerId = containerData.getContainerID();
     KeyValueContainerCheck checker =
         new KeyValueContainerCheck(containerData.getMetadataPath(), config,
-            containerId);
+            containerId, containerData.getVolume());
 
     return checker.fullCheck(throttler, canceler);
   }
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainerCheck.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainerCheck.java
index 5f3f1f8198..c560aabbe6 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainerCheck.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainerCheck.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.ozone.container.common.impl.ContainerLayoutVersion;
 import org.apache.hadoop.ozone.container.common.impl.ContainerDataYaml;
 import org.apache.hadoop.ozone.container.common.interfaces.BlockIterator;
 import org.apache.hadoop.ozone.container.common.interfaces.DBHandle;
+import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
 import org.apache.hadoop.ozone.container.keyvalue.helpers.BlockUtils;
 import org.apache.hadoop.ozone.container.keyvalue.helpers.ChunkUtils;
 import org.apache.hadoop.ozone.container.keyvalue.helpers.KeyValueContainerLocationUtil;
@@ -64,15 +65,17 @@ public class KeyValueContainerCheck {
   private ConfigurationSource checkConfig;
 
   private String metadataPath;
+  private HddsVolume volume;
 
   public KeyValueContainerCheck(String metadataPath, ConfigurationSource conf,
-      long containerID) {
+      long containerID, HddsVolume volume) {
     Preconditions.checkArgument(metadataPath != null);
 
     this.checkConfig = conf;
     this.containerID = containerID;
     this.onDiskContainerData = null;
     this.metadataPath = metadataPath;
+    this.volume = volume;
   }
 
   /**
@@ -215,9 +218,8 @@ public class KeyValueContainerCheck {
     Preconditions.checkState(onDiskContainerData != null,
         "invoke loadContainerData prior to calling this function");
 
-    File metaDir = new File(metadataPath);
     File dbFile = KeyValueContainerLocationUtil
-        .getContainerDBFile(metaDir, containerID);
+        .getContainerDBFile(onDiskContainerData);
 
     if (!dbFile.exists() || !dbFile.canRead()) {
       String dbFileErrorMsg = "Unable to access DB File [" + dbFile.toString()
@@ -330,6 +332,7 @@ public class KeyValueContainerCheck {
 
     onDiskContainerData = (KeyValueContainerData) ContainerDataYaml
         .readContainerFile(containerFile);
+    onDiskContainerData.setVolume(volume);
   }
 
   private void handleCorruption(IOException e) {
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainerData.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainerData.java
index 18cdb88fa6..c8e4e60573 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainerData.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainerData.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
 import org.apache.hadoop.hdds.utils.MetadataKeyFilters.KeyPrefixFilter;
 import org.apache.hadoop.hdds.utils.db.BatchOperation;
 import org.apache.hadoop.hdds.utils.db.Table;
+import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.ozone.container.common.impl.ContainerLayoutVersion;
 import org.apache.hadoop.ozone.container.common.impl.ContainerData;
 import org.apache.hadoop.ozone.container.common.interfaces.DBHandle;
@@ -54,6 +55,7 @@ import static org.apache.hadoop.ozone.OzoneConsts.SCHEMA_VERSION;
 import static org.apache.hadoop.ozone.OzoneConsts.CONTAINER_BYTES_USED;
 import static org.apache.hadoop.ozone.OzoneConsts.BLOCK_COUNT;
 import static org.apache.hadoop.ozone.OzoneConsts.PENDING_DELETE_BLOCK_COUNT;
+import static org.apache.hadoop.ozone.container.metadata.DatanodeSchemaThreeDBDefinition.getContainerKeyPrefix;
 
 /**
  * This class represents the KeyValueContainer metadata, which is the
@@ -113,12 +115,13 @@ public class KeyValueContainerData extends ContainerData {
     this.deleteTransactionId = 0;
   }
 
-  public KeyValueContainerData(ContainerData source) {
+  public KeyValueContainerData(KeyValueContainerData source) {
     super(source);
     Preconditions.checkArgument(source.getContainerType()
         == ContainerProtos.ContainerType.KeyValueContainer);
     this.numPendingDeletionBlocks = new AtomicLong(0);
     this.deleteTransactionId = 0;
+    this.schemaVersion = source.getSchemaVersion();
   }
 
   /**
@@ -375,6 +378,9 @@ public class KeyValueContainerData extends ContainerData {
    * @return
    */
   public String startKeyEmpty() {
+    if (schemaVersion.equals(OzoneConsts.SCHEMA_V3)) {
+      return getContainerKeyPrefix(getContainerID());
+    }
     return null;
   }
 
@@ -384,7 +390,10 @@ public class KeyValueContainerData extends ContainerData {
    * @return
    */
   public String containerPrefix() {
-    return null;
+    if (schemaVersion.equals(OzoneConsts.SCHEMA_V3)) {
+      return getContainerKeyPrefix(getContainerID());
+    }
+    return "";
   }
 
   /**
@@ -395,6 +404,9 @@ public class KeyValueContainerData extends ContainerData {
    * @return formatted key
    */
   private String formatKey(String key) {
+    if (schemaVersion.equals(OzoneConsts.SCHEMA_V3)) {
+      key = getContainerKeyPrefix(getContainerID()) + key;
+    }
     return key;
   }
 }
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainerMetadataInspector.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainerMetadataInspector.java
index 5dd7da3ee2..c6395de27d 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainerMetadataInspector.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainerMetadataInspector.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.ozone.container.common.impl.ContainerData;
 import org.apache.hadoop.ozone.container.common.interfaces.BlockIterator;
 import org.apache.hadoop.ozone.container.common.interfaces.ContainerInspector;
 import org.apache.hadoop.ozone.container.metadata.DatanodeStore;
+import org.apache.hadoop.ozone.container.metadata.DatanodeStoreSchemaThreeImpl;
 import org.apache.hadoop.ozone.container.metadata.DatanodeStoreSchemaTwoImpl;
 import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction;
 import org.slf4j.Logger;
@@ -203,7 +204,7 @@ public class KeyValueContainerMetadataInspector implements ContainerInspector {
 
       // Build DB metadata values.
       Table<String, Long> metadataTable = store.getMetadataTable();
-      JsonObject dBMetadata = getDBMetadataJson(metadataTable);
+      JsonObject dBMetadata = getDBMetadataJson(metadataTable, containerData);
       containerJson.add("dBMetadata", dBMetadata);
 
       // Build aggregate values.
@@ -223,20 +224,20 @@ public class KeyValueContainerMetadataInspector implements ContainerInspector {
     return containerJson;
   }
 
-  private JsonObject getDBMetadataJson(Table<String, Long> metadataTable)
-      throws IOException {
+  private JsonObject getDBMetadataJson(Table<String, Long> metadataTable,
+      KeyValueContainerData containerData) throws IOException {
     JsonObject dBMetadata = new JsonObject();
 
     dBMetadata.addProperty(OzoneConsts.BLOCK_COUNT,
-        metadataTable.get(OzoneConsts.BLOCK_COUNT));
+        metadataTable.get(containerData.blockCountKey()));
     dBMetadata.addProperty(OzoneConsts.CONTAINER_BYTES_USED,
-        metadataTable.get(OzoneConsts.CONTAINER_BYTES_USED));
+        metadataTable.get(containerData.bytesUsedKey()));
     dBMetadata.addProperty(OzoneConsts.PENDING_DELETE_BLOCK_COUNT,
-        metadataTable.get(OzoneConsts.PENDING_DELETE_BLOCK_COUNT));
+        metadataTable.get(containerData.pendingDeleteBlockCountKey()));
     dBMetadata.addProperty(OzoneConsts.DELETE_TRANSACTION_KEY,
-        metadataTable.get(OzoneConsts.DELETE_TRANSACTION_KEY));
+        metadataTable.get(containerData.latestDeleteTxnKey()));
     dBMetadata.addProperty(OzoneConsts.BLOCK_COMMIT_SEQUENCE_ID,
-        metadataTable.get(OzoneConsts.BLOCK_COMMIT_SEQUENCE_ID));
+        metadataTable.get(containerData.bcsIdKey()));
 
     return dBMetadata;
   }
@@ -277,6 +278,11 @@ public class KeyValueContainerMetadataInspector implements ContainerInspector {
           (DatanodeStoreSchemaTwoImpl) store;
       pendingDeleteBlockCountTotal =
           countPendingDeletesSchemaV2(schemaTwoStore);
+    } else if (schemaVersion.equals(OzoneConsts.SCHEMA_V3)) {
+      DatanodeStoreSchemaThreeImpl schemaThreeStore =
+          (DatanodeStoreSchemaThreeImpl) store;
+      pendingDeleteBlockCountTotal =
+          countPendingDeletesSchemaV3(schemaThreeStore, containerData);
     } else {
       throw new IOException("Failed to process deleted blocks for unknown " +
               "container schema " + schemaVersion);
@@ -308,8 +314,8 @@ public class KeyValueContainerMetadataInspector implements ContainerInspector {
     return chunksDirectory;
   }
 
-  private boolean checkAndRepair(JsonObject parent, ContainerData containerData,
-      DatanodeStore store) {
+  private boolean checkAndRepair(JsonObject parent,
+      KeyValueContainerData containerData, DatanodeStore store) {
     JsonArray errors = new JsonArray();
     boolean passed = true;
 
@@ -335,7 +341,7 @@ public class KeyValueContainerMetadataInspector implements ContainerInspector {
       BooleanSupplier keyRepairAction = () -> {
         boolean repaired = false;
         try {
-          metadataTable.put(OzoneConsts.BLOCK_COUNT,
+          metadataTable.put(containerData.blockCountKey(),
               blockCountAggregate.getAsLong());
           repaired = true;
         } catch (IOException ex) {
@@ -370,7 +376,7 @@ public class KeyValueContainerMetadataInspector implements ContainerInspector {
       BooleanSupplier keyRepairAction = () -> {
         boolean repaired = false;
         try {
-          metadataTable.put(OzoneConsts.CONTAINER_BYTES_USED,
+          metadataTable.put(containerData.bytesUsedKey(),
               usedBytesAggregate.getAsLong());
           repaired = true;
         } catch (IOException ex) {
@@ -451,6 +457,23 @@ public class KeyValueContainerMetadataInspector implements ContainerInspector {
     return pendingDeleteBlockCountTotal;
   }
 
+  private long countPendingDeletesSchemaV3(
+      DatanodeStoreSchemaThreeImpl schemaThreeStore,
+      KeyValueContainerData containerData) throws IOException {
+    long pendingDeleteBlockCountTotal = 0;
+    try (
+        TableIterator<String, ? extends Table.KeyValue<String,
+            DeletedBlocksTransaction>>
+            iter = schemaThreeStore.getDeleteTransactionTable()
+            .iterator(containerData.containerPrefix())) {
+      while (iter.hasNext()) {
+        DeletedBlocksTransaction delTx = iter.next().getValue();
+        pendingDeleteBlockCountTotal += delTx.getLocalIDList().size();
+      }
+      return pendingDeleteBlockCountTotal;
+    }
+  }
+
   private static long getBlockLength(BlockData block) {
     long blockLen = 0;
     List<ContainerProtos.ChunkInfo> chunkInfoList = block.getChunks();
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
index b33e9b9f43..aa36738172 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
@@ -318,10 +318,9 @@ public class KeyValueHandler extends Handler {
       HddsVolume containerVolume = volumeChoosingPolicy.chooseVolume(
           StorageVolumeUtil.getHddsVolumesList(volumeSet.getVolumesList()),
           container.getContainerData().getMaxSize());
-      String hddsVolumeDir = containerVolume.getHddsRootDir().toString();
       String idDir = VersionedDatanodeFeatures.ScmHA.chooseContainerPathID(
               containerVolume, clusterId);
-      container.populatePathFields(idDir, containerVolume, hddsVolumeDir);
+      container.populatePathFields(idDir, containerVolume);
     } finally {
       volumeSet.readUnlock();
     }
@@ -942,9 +941,11 @@ public class KeyValueHandler extends Handler {
       final InputStream rawContainerStream,
       final TarContainerPacker packer)
       throws IOException {
+    Preconditions.checkState(originalContainerData instanceof
+        KeyValueContainerData, "Should be KeyValueContainerData instance");
 
-    KeyValueContainerData containerData =
-        new KeyValueContainerData(originalContainerData);
+    KeyValueContainerData containerData = new KeyValueContainerData(
+        (KeyValueContainerData) originalContainerData);
 
     KeyValueContainer container = new KeyValueContainer(containerData,
         conf);
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/BlockUtils.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/BlockUtils.java
index 6959773d6c..fb5d5a72b2 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/BlockUtils.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/BlockUtils.java
@@ -151,17 +151,12 @@ public final class BlockUtils {
       ConfigurationSource conf) {
     Preconditions.checkNotNull(container);
     Preconditions.checkNotNull(container.getDbFile());
+    Preconditions.checkState(!container.getSchemaVersion()
+        .equals(OzoneConsts.SCHEMA_V3));
 
-    String containerDBPath = container.getDbFile().getAbsolutePath();
-    if (container.getSchemaVersion().equals(OzoneConsts.SCHEMA_V3)) {
-      DatanodeStoreCache cache = DatanodeStoreCache.getInstance();
-      Preconditions.checkNotNull(cache);
-      cache.removeDB(containerDBPath);
-    } else {
-      ContainerCache cache = ContainerCache.getInstance(conf);
-      Preconditions.checkNotNull(cache);
-      cache.removeDB(containerDBPath);
-    }
+    ContainerCache cache = ContainerCache.getInstance(conf);
+    Preconditions.checkNotNull(cache);
+    cache.removeDB(container.getDbFile().getAbsolutePath());
   }
 
   /**
@@ -237,4 +232,21 @@ public final class BlockUtils {
               + containerBCSId + ".", UNKNOWN_BCSID);
     }
   }
+
+  /**
+   * Remove container KV metadata from per-disk db store.
+   * @param containerData
+   * @param conf
+   * @throws IOException
+   */
+  public static void removeContainerFromDB(KeyValueContainerData containerData,
+      ConfigurationSource conf) throws IOException {
+    try (DBHandle db = getDB(containerData, conf)) {
+      Preconditions.checkState(db.getStore()
+          instanceof DatanodeStoreSchemaThreeImpl);
+
+      ((DatanodeStoreSchemaThreeImpl) db.getStore()).dropAllWithPrefix(
+          containerData.getContainerID());
+    }
+  }
 }
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/KeyValueContainerLocationUtil.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/KeyValueContainerLocationUtil.java
index dde3e2e22d..13a5a69811 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/KeyValueContainerLocationUtil.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/KeyValueContainerLocationUtil.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.ozone.container.keyvalue.helpers;
 import com.google.common.base.Preconditions;
 import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.ozone.common.Storage;
+import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
 
 import java.io.File;
 
@@ -105,9 +106,12 @@ public final class KeyValueContainerLocationUtil {
   /**
    * Return containerDB File.
    */
-  public static File getContainerDBFile(File containerMetaDataPath,
-      long containerID) {
-    return new File(containerMetaDataPath, containerID + OzoneConsts
-        .DN_CONTAINER_DB);
+  public static File getContainerDBFile(KeyValueContainerData containerData) {
+    if (containerData.getSchemaVersion().equals(OzoneConsts.SCHEMA_V3)) {
+      return new File(containerData.getVolume().getDbParentDir(),
+          OzoneConsts.CONTAINER_DB_NAME);
+    }
+    return new File(containerData.getMetadataPath(),
+        containerData.getContainerID() + OzoneConsts.DN_CONTAINER_DB);
   }
 }
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/KeyValueContainerUtil.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/KeyValueContainerUtil.java
index 85bacc9a31..b23a49556f 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/KeyValueContainerUtil.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/KeyValueContainerUtil.java
@@ -109,6 +109,10 @@ public final class KeyValueContainerUtil {
     } else if (schemaVersion.equals(OzoneConsts.SCHEMA_V2)) {
       store = new DatanodeStoreSchemaTwoImpl(conf, dbFile.getAbsolutePath(),
           false);
+    } else if (schemaVersion.equals(OzoneConsts.SCHEMA_V3)) {
+      // We don't create per-container store for schema v3 containers,
+      // they should use per-volume db store.
+      return;
     } else {
       throw new IllegalArgumentException(
               "Unrecognized schema version for container: " + schemaVersion);
@@ -138,8 +142,12 @@ public final class KeyValueContainerUtil {
         .getMetadataPath());
     File chunksPath = new File(containerData.getChunksPath());
 
-    // Close the DB connection and remove the DB handler from cache
-    BlockUtils.removeDB(containerData, conf);
+    if (containerData.getSchemaVersion().equals(OzoneConsts.SCHEMA_V3)) {
+      BlockUtils.removeContainerFromDB(containerData, conf);
+    } else {
+      // Close the DB connection and remove the DB handler from cache
+      BlockUtils.removeDB(containerData, conf);
+    }
 
     // Delete the Container MetaData path.
     FileUtils.deleteDirectory(containerMetaDataPath);
@@ -163,13 +171,18 @@ public final class KeyValueContainerUtil {
       ConfigurationSource config) throws IOException {
 
     long containerID = kvContainerData.getContainerID();
-    File metadataPath = new File(kvContainerData.getMetadataPath());
 
     // Verify Checksum
     ContainerUtils.verifyChecksum(kvContainerData, config);
 
+    if (kvContainerData.getSchemaVersion() == null) {
+      // If this container has not specified a schema version, it is in the old
+      // format with one default column family.
+      kvContainerData.setSchemaVersion(OzoneConsts.SCHEMA_V1);
+    }
+
     File dbFile = KeyValueContainerLocationUtil.getContainerDBFile(
-        metadataPath, containerID);
+        kvContainerData);
     if (!dbFile.exists()) {
       LOG.error("Container DB file is missing for ContainerID {}. " +
           "Skipping loading of this container.", containerID);
@@ -178,13 +191,13 @@ public final class KeyValueContainerUtil {
     }
     kvContainerData.setDbFile(dbFile);
 
-    if (kvContainerData.getSchemaVersion() == null) {
-      // If this container has not specified a schema version, it is in the old
-      // format with one default column family.
-      kvContainerData.setSchemaVersion(OzoneConsts.SCHEMA_V1);
+    if (kvContainerData.getSchemaVersion().equals(OzoneConsts.SCHEMA_V3)) {
+      try (DBHandle db = BlockUtils.getDB(kvContainerData, config)) {
+        populateContainerMetadata(kvContainerData, db.getStore());
+      }
+      return;
     }
 
-    boolean isBlockMetadataSet = false;
     DBHandle cachedDB = null;
     DatanodeStore store = null;
     try {
@@ -203,70 +216,7 @@ public final class KeyValueContainerUtil {
             "instance was retrieved from the cache. This should only happen " +
             "in tests");
       }
-      Table<String, Long> metadataTable = store.getMetadataTable();
-
-      // Set pending deleted block count.
-      Long pendingDeleteBlockCount =
-          metadataTable.get(kvContainerData.pendingDeleteBlockCountKey());
-      if (pendingDeleteBlockCount != null) {
-        kvContainerData.incrPendingDeletionBlocks(
-                pendingDeleteBlockCount);
-      } else {
-        // Set pending deleted block count.
-        MetadataKeyFilters.KeyPrefixFilter filter =
-                kvContainerData.getDeletingBlockKeyFilter();
-        int numPendingDeletionBlocks =
-            store.getBlockDataTable()
-            .getSequentialRangeKVs(kvContainerData.startKeyEmpty(),
-                Integer.MAX_VALUE, kvContainerData.containerPrefix(), filter)
-            .size();
-        kvContainerData.incrPendingDeletionBlocks(numPendingDeletionBlocks);
-      }
-
-      // Set delete transaction id.
-      Long delTxnId =
-          metadataTable.get(kvContainerData.latestDeleteTxnKey());
-      if (delTxnId != null) {
-        kvContainerData
-            .updateDeleteTransactionId(delTxnId);
-      }
-
-      // Set BlockCommitSequenceId.
-      Long bcsId = metadataTable.get(kvContainerData.bcsIdKey());
-      if (bcsId != null) {
-        kvContainerData
-            .updateBlockCommitSequenceId(bcsId);
-      }
-
-      // Set bytes used.
-      // commitSpace for Open Containers relies on usedBytes
-      Long bytesUsed =
-          metadataTable.get(kvContainerData.bytesUsedKey());
-      if (bytesUsed != null) {
-        isBlockMetadataSet = true;
-        kvContainerData.setBytesUsed(bytesUsed);
-      }
-
-      // Set block count.
-      Long blockCount = metadataTable.get(kvContainerData.blockCountKey());
-      if (blockCount != null) {
-        isBlockMetadataSet = true;
-        kvContainerData.setBlockCount(blockCount);
-      }
-      if (!isBlockMetadataSet) {
-        initializeUsedBytesAndBlockCount(store, kvContainerData);
-      }
-
-      // If the container is missing a chunks directory, possibly due to the
-      // bug fixed by HDDS-6235, create it here.
-      File chunksDir = new File(kvContainerData.getChunksPath());
-      if (!chunksDir.exists()) {
-        Files.createDirectories(chunksDir.toPath());
-      }
-      // Run advanced container inspection/repair operations if specified on
-      // startup. If this method is called but not as a part of startup,
-      // The inspectors will be unloaded and this will be a no-op.
-      ContainerInspectorUtil.process(kvContainerData, store);
+      populateContainerMetadata(kvContainerData, store);
     } finally {
       if (cachedDB != null) {
         // If we get a cached instance, calling close simply decrements the
@@ -287,6 +237,78 @@ public final class KeyValueContainerUtil {
     }
   }
 
+  private static void populateContainerMetadata(
+      KeyValueContainerData kvContainerData, DatanodeStore store)
+      throws IOException {
+    boolean isBlockMetadataSet = false;
+    Table<String, Long> metadataTable = store.getMetadataTable();
+
+    // Set pending deleted block count.
+    Long pendingDeleteBlockCount =
+        metadataTable.get(kvContainerData
+            .pendingDeleteBlockCountKey());
+    if (pendingDeleteBlockCount != null) {
+      kvContainerData.incrPendingDeletionBlocks(
+          pendingDeleteBlockCount);
+    } else {
+      // Set pending deleted block count.
+      MetadataKeyFilters.KeyPrefixFilter filter =
+          kvContainerData.getDeletingBlockKeyFilter();
+      int numPendingDeletionBlocks = store.getBlockDataTable()
+              .getSequentialRangeKVs(kvContainerData.startKeyEmpty(),
+                  Integer.MAX_VALUE, kvContainerData.containerPrefix(),
+                  filter).size();
+      kvContainerData.incrPendingDeletionBlocks(numPendingDeletionBlocks);
+    }
+
+    // Set delete transaction id.
+    Long delTxnId =
+        metadataTable.get(kvContainerData.latestDeleteTxnKey());
+    if (delTxnId != null) {
+      kvContainerData
+          .updateDeleteTransactionId(delTxnId);
+    }
+
+    // Set BlockCommitSequenceId.
+    Long bcsId = metadataTable.get(
+        kvContainerData.bcsIdKey());
+    if (bcsId != null) {
+      kvContainerData
+          .updateBlockCommitSequenceId(bcsId);
+    }
+
+    // Set bytes used.
+    // commitSpace for Open Containers relies on usedBytes
+    Long bytesUsed =
+        metadataTable.get(kvContainerData.bytesUsedKey());
+    if (bytesUsed != null) {
+      isBlockMetadataSet = true;
+      kvContainerData.setBytesUsed(bytesUsed);
+    }
+
+    // Set block count.
+    Long blockCount = metadataTable.get(
+        kvContainerData.blockCountKey());
+    if (blockCount != null) {
+      isBlockMetadataSet = true;
+      kvContainerData.setBlockCount(blockCount);
+    }
+    if (!isBlockMetadataSet) {
+      initializeUsedBytesAndBlockCount(store, kvContainerData);
+    }
+
+    // If the container is missing a chunks directory, possibly due to the
+    // bug fixed by HDDS-6235, create it here.
+    File chunksDir = new File(kvContainerData.getChunksPath());
+    if (!chunksDir.exists()) {
+      Files.createDirectories(chunksDir.toPath());
+    }
+    // Run advanced container inspection/repair operations if specified on
+    // startup. If this method is called but not as a part of startup,
+    // The inspectors will be unloaded and this will be a no-op.
+    ContainerInspectorUtil.process(kvContainerData, store);
+  }
+
   /**
    * Initialize bytes used and block count.
    * @param kvData
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeStoreSchemaThreeImpl.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeStoreSchemaThreeImpl.java
index 45b5b08c48..5178d9b0ff 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeStoreSchemaThreeImpl.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeStoreSchemaThreeImpl.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.ozone.container.metadata;
 import org.apache.hadoop.hdds.conf.ConfigurationSource;
 import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction;
 import org.apache.hadoop.hdds.utils.MetadataKeyFilters;
+import org.apache.hadoop.hdds.utils.db.BatchOperation;
 import org.apache.hadoop.hdds.utils.db.Table;
 import org.apache.hadoop.ozone.container.common.helpers.BlockData;
 import org.apache.hadoop.ozone.container.common.interfaces.BlockIterator;
@@ -74,4 +75,15 @@ public class DatanodeStoreSchemaThreeImpl extends AbstractDatanodeStore {
         getBlockDataTableWithIterator()
             .iterator(getContainerKeyPrefix(containerID)), filter);
   }
+
+  public void dropAllWithPrefix(long containerID) throws IOException {
+    String prefix = getContainerKeyPrefix(containerID);
+    try (BatchOperation batch = getBatchHandler().initBatchOperation()) {
+      getMetadataTable().deleteBatchWithPrefix(batch, prefix);
+      getBlockDataTable().deleteBatchWithPrefix(batch, prefix);
+      getDeletedBlocksTable().deleteBatchWithPrefix(batch, prefix);
+      getDeleteTransactionTable().deleteBatchWithPrefix(batch, prefix);
+      getBatchHandler().commitBatchOperation(batch);
+    }
+  }
 }
\ No newline at end of file
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeTable.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeTable.java
index 37eb58d02b..6254e1ecf7 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeTable.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeTable.java
@@ -131,6 +131,12 @@ public class DatanodeTable<KEY, VALUE> implements Table<KEY, VALUE> {
     return table.getSequentialRangeKVs(startKey, count, prefix, filters);
   }
 
+  @Override
+  public void deleteBatchWithPrefix(BatchOperation batch, KEY prefix)
+      throws IOException {
+    table.deleteBatchWithPrefix(batch, prefix);
+  }
+
   @Override
   public void close() throws Exception {
     table.close();
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/ContainerTestUtils.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/ContainerTestUtils.java
index 405908de3e..a60f0d52ae 100644
--- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/ContainerTestUtils.java
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/ContainerTestUtils.java
@@ -24,6 +24,7 @@ import java.util.UUID;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.StorageUnit;
+import org.apache.hadoop.hdds.conf.ConfigurationSource;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
@@ -37,6 +38,9 @@ import org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfigurati
 import org.apache.hadoop.ozone.container.common.statemachine.DatanodeStateMachine;
 import org.apache.hadoop.ozone.container.common.statemachine.EndpointStateMachine;
 import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
+import org.apache.hadoop.ozone.container.common.utils.StorageVolumeUtil;
+import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
+import org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet;
 import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainer;
 import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
 import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
@@ -138,4 +142,19 @@ public final class ContainerTestUtils {
     dc.setContainerSchemaV3Enabled(false);
     conf.setFromObject(dc);
   }
+
+  public static void createDbInstancesForTestIfNeeded(
+      MutableVolumeSet hddsVolumeSet, String scmID, String clusterID,
+      ConfigurationSource conf) {
+    DatanodeConfiguration dc = conf.getObject(DatanodeConfiguration.class);
+    if (!dc.getContainerSchemaV3Enabled()) {
+      return;
+    }
+
+    for (HddsVolume volume : StorageVolumeUtil.getHddsVolumesList(
+        hddsVolumeSet.getVolumesList())) {
+      StorageVolumeUtil.checkVolume(volume, scmID, clusterID, conf,
+          null, null);
+    }
+  }
 }
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/TestBlockDeletingService.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/TestBlockDeletingService.java
index 38e62976bd..08b5b3369e 100644
--- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/TestBlockDeletingService.java
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/TestBlockDeletingService.java
@@ -33,7 +33,6 @@ import org.apache.commons.io.FileUtils;
 import org.apache.hadoop.hdds.HddsConfigKeys;
 import org.apache.hadoop.hdds.client.BlockID;
 import org.apache.hadoop.hdds.conf.ConfigurationSource;
-import org.apache.hadoop.hdds.conf.MutableConfigurationSource;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
 import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos;
@@ -42,6 +41,7 @@ import org.apache.hadoop.hdds.utils.BackgroundService;
 import org.apache.hadoop.hdds.utils.db.BatchOperation;
 import org.apache.hadoop.hdds.utils.db.Table;
 import org.apache.hadoop.hdds.utils.db.TableIterator;
+import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.ozone.common.Checksum;
 import org.apache.hadoop.ozone.common.ChunkBuffer;
 import org.apache.hadoop.ozone.container.ContainerTestHelper;
@@ -60,6 +60,7 @@ import org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet;
 import org.apache.hadoop.ozone.container.common.volume.RoundRobinVolumeChoosingPolicy;
 import org.apache.hadoop.ozone.container.common.volume.StorageVolume;
 import org.apache.hadoop.ozone.container.common.volume.VolumeSet;
+import org.apache.hadoop.ozone.container.keyvalue.ContainerTestVersionInfo;
 import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainer;
 import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
 import org.apache.hadoop.ozone.container.keyvalue.KeyValueHandler;
@@ -76,7 +77,6 @@ import org.apache.ozone.test.GenericTestUtils;
 import org.apache.ozone.test.GenericTestUtils.LogCapturer;
 import org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration;
 
-import static java.util.stream.Collectors.toList;
 import static org.apache.commons.lang3.RandomStringUtils.randomAlphanumeric;
 
 import org.junit.AfterClass;
@@ -89,11 +89,11 @@ import org.junit.runners.Parameterized;
 import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_CONTAINER_LIMIT_PER_INTERVAL;
 import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_LIMIT_PER_CONTAINER;
 import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_INTERVAL;
-import static org.apache.hadoop.ozone.OzoneConsts.SCHEMA_VERSIONS;
 import static org.apache.hadoop.ozone.OzoneConsts.SCHEMA_V1;
 import static org.apache.hadoop.ozone.OzoneConsts.SCHEMA_V2;
 import static org.apache.hadoop.ozone.container.common.impl.ContainerLayoutVersion.FILE_PER_BLOCK;
 import static org.apache.hadoop.ozone.container.common.states.endpoint.VersionEndpointTask.LOG;
+import static org.junit.Assume.assumeFalse;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
@@ -109,45 +109,22 @@ public class TestBlockDeletingService {
   private static String scmId;
   private static String clusterID;
   private static String datanodeUuid;
-  private static MutableConfigurationSource conf;
+  private static OzoneConfiguration conf;
 
   private final ContainerLayoutVersion layout;
   private final String schemaVersion;
   private int blockLimitPerInterval;
   private static VolumeSet volumeSet;
 
-  public TestBlockDeletingService(LayoutInfo layoutInfo) {
-    this.layout = layoutInfo.layout;
-    this.schemaVersion = layoutInfo.schemaVersion;
+  public TestBlockDeletingService(ContainerTestVersionInfo versionInfo) {
+    this.layout = versionInfo.getLayout();
+    this.schemaVersion = versionInfo.getSchemaVersion();
+    ContainerTestVersionInfo.setTestSchemaVersion(schemaVersion, conf);
   }
 
   @Parameterized.Parameters
   public static Iterable<Object[]> parameters() {
-    return LayoutInfo.layoutList.stream().map(each -> new Object[] {each})
-        .collect(toList());
-  }
-
-  /**
-   * Bundles test parameters for TestBlockDeletingService.
-   */
-  public static class LayoutInfo {
-    private final String schemaVersion;
-    private final ContainerLayoutVersion layout;
-
-    public LayoutInfo(String schemaVersion, ContainerLayoutVersion layout) {
-      this.schemaVersion = schemaVersion;
-      this.layout = layout;
-    }
-
-    private static List<LayoutInfo> layoutList = new ArrayList<>();
-    static {
-      for (ContainerLayoutVersion ch :
-          ContainerLayoutVersion.getAllVersions()) {
-        for (String sch : SCHEMA_VERSIONS) {
-          layoutList.add(new LayoutInfo(sch, ch));
-        }
-      }
-    }
+    return ContainerTestVersionInfo.versionParameters();
   }
 
   @BeforeClass
@@ -404,6 +381,8 @@ public class TestBlockDeletingService {
 
   @Test
   public void testBlockDeletion() throws Exception {
+    assumeFalse(schemaVersion.equals(OzoneConsts.SCHEMA_V3));
+
     DatanodeConfiguration dnConf = conf.getObject(DatanodeConfiguration.class);
     dnConf.setBlockDeletionLimit(2);
     this.blockLimitPerInterval = dnConf.getBlockDeletionLimit();
@@ -483,6 +462,8 @@ public class TestBlockDeletingService {
   @Test
   @SuppressWarnings("java:S2699") // waitFor => assertion with timeout
   public void testShutdownService() throws Exception {
+    assumeFalse(schemaVersion.equals(OzoneConsts.SCHEMA_V3));
+
     conf.setTimeDuration(OZONE_BLOCK_DELETING_SERVICE_INTERVAL, 500,
         TimeUnit.MILLISECONDS);
     conf.setInt(OZONE_BLOCK_DELETING_CONTAINER_LIMIT_PER_INTERVAL, 10);
@@ -511,6 +492,8 @@ public class TestBlockDeletingService {
 
   @Test
   public void testBlockDeletionTimeout() throws Exception {
+    assumeFalse(schemaVersion.equals(OzoneConsts.SCHEMA_V3));
+
     DatanodeConfiguration dnConf = conf.getObject(DatanodeConfiguration.class);
     dnConf.setBlockDeletionLimit(3);
     blockLimitPerInterval = dnConf.getBlockDeletionLimit();
@@ -598,6 +581,8 @@ public class TestBlockDeletingService {
   @Test(timeout = 30000)
   @org.junit.Ignore
   public void testContainerThrottle() throws Exception {
+    assumeFalse(schemaVersion.equals(OzoneConsts.SCHEMA_V3));
+
     // Properties :
     //  - Number of containers : 2
     //  - Number of blocks per container : 1
@@ -672,6 +657,8 @@ public class TestBlockDeletingService {
 
   @Test(timeout = 30000)
   public void testBlockThrottle() throws Exception {
+    assumeFalse(schemaVersion.equals(OzoneConsts.SCHEMA_V3));
+
     // Properties :
     //  - Number of containers : 5
     //  - Number of blocks per container : 3
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/TestKeyValueContainerData.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/TestKeyValueContainerData.java
index 41fccb8bba..394231260c 100644
--- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/TestKeyValueContainerData.java
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/TestKeyValueContainerData.java
@@ -19,9 +19,10 @@
 package org.apache.hadoop.ozone.container.common;
 
 import org.apache.hadoop.conf.StorageUnit;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
 import org.apache.hadoop.ozone.container.common.impl.ContainerLayoutVersion;
-import org.apache.hadoop.ozone.container.keyvalue.ContainerLayoutTestInfo;
+import org.apache.hadoop.ozone.container.keyvalue.ContainerTestVersionInfo;
 import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
 import org.apache.hadoop.ozone.container.upgrade.VersionedDatanodeFeatures;
 import org.junit.Test;
@@ -42,14 +43,19 @@ public class TestKeyValueContainerData {
   private static final long MAXSIZE = (long) StorageUnit.GB.toBytes(5);
 
   private final ContainerLayoutVersion layout;
-
-  public TestKeyValueContainerData(ContainerLayoutVersion layout) {
-    this.layout = layout;
+  private final String schemaVersion;
+  private final OzoneConfiguration conf;
+
+  public TestKeyValueContainerData(ContainerTestVersionInfo versionInfo) {
+    this.layout = versionInfo.getLayout();
+    this.schemaVersion = versionInfo.getSchemaVersion();
+    this.conf = new OzoneConfiguration();
+    ContainerTestVersionInfo.setTestSchemaVersion(schemaVersion, conf);
   }
 
   @Parameterized.Parameters
   public static Iterable<Object[]> parameters() {
-    return ContainerLayoutTestInfo.containerLayoutParameters();
+    return ContainerTestVersionInfo.versionParameters();
   }
 
   @Test
@@ -94,7 +100,7 @@ public class TestKeyValueContainerData {
     kvData.incrBlockCount();
     kvData.incrPendingDeletionBlocks(1);
     kvData.setSchemaVersion(
-        VersionedDatanodeFeatures.SchemaV2.chooseSchemaVersion());
+        VersionedDatanodeFeatures.SchemaV3.chooseSchemaVersion(conf));
 
     assertEquals(state, kvData.getState());
     assertEquals(containerDBType, kvData.getContainerDBType());
@@ -109,7 +115,7 @@ public class TestKeyValueContainerData {
     assertEquals(1, kvData.getNumPendingDeletionBlocks());
     assertEquals(pipelineId.toString(), kvData.getOriginPipelineId());
     assertEquals(datanodeId.toString(), kvData.getOriginNodeId());
-    assertEquals(VersionedDatanodeFeatures.SchemaV2.chooseSchemaVersion(),
+    assertEquals(VersionedDatanodeFeatures.SchemaV3.chooseSchemaVersion(conf),
         kvData.getSchemaVersion());
   }
 
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/TestSchemaOneBackwardsCompatibility.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/TestSchemaOneBackwardsCompatibility.java
index a5d35d3567..6e2e8654f3 100644
--- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/TestSchemaOneBackwardsCompatibility.java
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/TestSchemaOneBackwardsCompatibility.java
@@ -37,6 +37,7 @@ import org.apache.hadoop.ozone.container.common.interfaces.DBHandle;
 import org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet;
 import org.apache.hadoop.ozone.container.common.volume.StorageVolume;
 import org.apache.hadoop.ozone.container.common.volume.VolumeSet;
+import org.apache.hadoop.ozone.container.keyvalue.ContainerTestVersionInfo;
 import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainer;
 import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
 import org.apache.hadoop.ozone.container.keyvalue.KeyValueHandler;
@@ -52,6 +53,8 @@ import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 import org.yaml.snakeyaml.Yaml;
 
 import java.io.File;
@@ -89,6 +92,7 @@ import static org.mockito.Mockito.when;
  * {@link TestDB}, which is used by these tests to load a pre created schema
  * version 1 RocksDB instance from test resources.
  */
+@RunWith(Parameterized.class)
 public class TestSchemaOneBackwardsCompatibility {
   private OzoneConfiguration conf;
 
@@ -98,9 +102,21 @@ public class TestSchemaOneBackwardsCompatibility {
   @Rule
   public TemporaryFolder tempFolder = new TemporaryFolder();
 
+  public TestSchemaOneBackwardsCompatibility(String schemaVersion) {
+    this.conf = new OzoneConfiguration();
+    ContainerTestVersionInfo.setTestSchemaVersion(schemaVersion, conf);
+  }
+
+  @Parameterized.Parameters
+  public static Iterable<Object[]> parameters() {
+    return Arrays.asList(new Object[][]{
+        {OzoneConsts.SCHEMA_V2},
+        {OzoneConsts.SCHEMA_V3}
+    });
+  }
+
   @Before
   public void setup() throws Exception {
-    conf = new OzoneConfiguration();
     TestDB testDB = new TestDB();
 
     // Copy data to the temporary folder so it can be safely modified.
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerPersistence.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerPersistence.java
index 954e5be02f..09dd32b1d7 100644
--- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerPersistence.java
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerPersistence.java
@@ -54,8 +54,7 @@ import org.apache.hadoop.ozone.container.common.transport.server.ratis.Dispatche
 import org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet;
 import org.apache.hadoop.ozone.container.common.volume.RoundRobinVolumeChoosingPolicy;
 import org.apache.hadoop.ozone.container.common.volume.StorageVolume;
-import org.apache.hadoop.ozone.container.common.volume.VolumeSet;
-import org.apache.hadoop.ozone.container.keyvalue.ContainerLayoutTestInfo;
+import org.apache.hadoop.ozone.container.keyvalue.ContainerTestVersionInfo;
 import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainer;
 import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
 import org.apache.hadoop.ozone.container.keyvalue.helpers.BlockUtils;
@@ -75,6 +74,8 @@ import static org.apache.hadoop.ozone.container.ContainerTestHelper.setDataCheck
 import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Assert;
+
+import static org.apache.hadoop.ozone.container.common.ContainerTestUtils.createDbInstancesForTestIfNeeded;
 import static org.junit.Assert.fail;
 import org.junit.Before;
 import org.junit.BeforeClass;
@@ -103,7 +104,7 @@ public class TestContainerPersistence {
   private static VolumeChoosingPolicy volumeChoosingPolicy;
 
   private ContainerSet containerSet;
-  private VolumeSet volumeSet;
+  private MutableVolumeSet volumeSet;
   private BlockManager blockManager;
   private ChunkManager chunkManager;
 
@@ -116,14 +117,17 @@ public class TestContainerPersistence {
   public Timeout testTimeout = Timeout.seconds(300);
 
   private final ContainerLayoutVersion layout;
+  private final String schemaVersion;
 
-  public TestContainerPersistence(ContainerLayoutVersion layout) {
-    this.layout = layout;
+  public TestContainerPersistence(ContainerTestVersionInfo versionInfo) {
+    this.layout = versionInfo.getLayout();
+    this.schemaVersion = versionInfo.getSchemaVersion();
+    ContainerTestVersionInfo.setTestSchemaVersion(schemaVersion, conf);
   }
 
   @Parameterized.Parameters
   public static Iterable<Object[]> parameters() {
-    return ContainerLayoutTestInfo.containerLayoutParameters();
+    return ContainerTestVersionInfo.versionParameters();
   }
 
   @BeforeClass
@@ -146,6 +150,7 @@ public class TestContainerPersistence {
     containerSet = new ContainerSet();
     volumeSet = new MutableVolumeSet(DATANODE_UUID, conf, null,
         StorageVolume.VolumeType.DATA_VOLUME, null);
+    createDbInstancesForTestIfNeeded(volumeSet, SCM_ID, SCM_ID, conf);
     blockManager = new BlockManagerImpl(conf);
     chunkManager = ChunkManagerFactory.createChunkManager(conf, blockManager,
         null);
@@ -158,6 +163,9 @@ public class TestContainerPersistence {
 
   @After
   public void cleanupDir() throws IOException {
+    // Cleanup cache
+    BlockUtils.shutdownCache(conf);
+
     // Clean up SCM metadata
     log.info("Deleting {}", hddsPath);
     FileUtils.deleteDirectory(new File(hddsPath));
@@ -265,6 +273,12 @@ public class TestContainerPersistence {
     Assert.assertFalse(containerSet.getContainerMapCopy()
         .containsKey(testContainerID1));
 
+    // With schema v3, we don't have a container dedicated db,
+    // so skip check the behaviors related to it.
+    if (schemaVersion.equals(OzoneConsts.SCHEMA_V3)) {
+      return;
+    }
+
     // Adding block to a deleted container should fail.
     exception.expect(StorageContainerException.class);
     exception.expectMessage("Error opening DB.");
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/ContainerTestVersionInfo.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/ContainerTestVersionInfo.java
new file mode 100644
index 0000000000..7111374b2b
--- /dev/null
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/ContainerTestVersionInfo.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.hadoop.ozone.container.keyvalue;
+
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.container.common.ContainerTestUtils;
+import org.apache.hadoop.ozone.container.common.impl.ContainerLayoutVersion;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static java.util.stream.Collectors.toList;
+import static org.apache.hadoop.ozone.OzoneConsts.SCHEMA_VERSIONS;
+
+/**
+ * Class to hold version info for container data and metadata.
+ * - SchemaVersion: metadata schema version
+ * - ChunkLayOutVersion: data layout version
+ */
+public class ContainerTestVersionInfo {
+  private final String schemaVersion;
+  private final ContainerLayoutVersion layout;
+
+  public ContainerTestVersionInfo(String schemaVersion,
+      ContainerLayoutVersion layout) {
+    this.schemaVersion = schemaVersion;
+    this.layout = layout;
+  }
+
+  private static List<ContainerTestVersionInfo> layoutList = new ArrayList<>();
+  static {
+    for (ContainerLayoutVersion ch : ContainerLayoutVersion.getAllVersions()) {
+      for (String sch : SCHEMA_VERSIONS) {
+        layoutList.add(new ContainerTestVersionInfo(sch, ch));
+      }
+    }
+  }
+
+  public String getSchemaVersion() {
+    return this.schemaVersion;
+  }
+
+  public ContainerLayoutVersion getLayout() {
+    return this.layout;
+  }
+
+  public static Iterable<Object[]> versionParameters() {
+    return layoutList.stream().map(each -> new Object[] {each})
+        .collect(toList());
+  }
+
+  public static void setTestSchemaVersion(String schemaVersion,
+      OzoneConfiguration conf) {
+    if (schemaVersion.equals(OzoneConsts.SCHEMA_V3)) {
+      ContainerTestUtils.enableSchemaV3(conf);
+    } else {
+      ContainerTestUtils.disableSchemaV3(conf);
+    }
+  }
+}
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueBlockIterator.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueBlockIterator.java
index 9c3cd19822..a68349c0aa 100644
--- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueBlockIterator.java
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueBlockIterator.java
@@ -20,8 +20,6 @@ package org.apache.hadoop.ozone.container.keyvalue;
 
 import java.io.File;
 import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
@@ -50,10 +48,10 @@ import org.apache.hadoop.ozone.container.keyvalue.helpers.BlockUtils;
 import org.apache.ozone.test.GenericTestUtils;
 
 import static org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_DATANODE_DIR_KEY;
-import static org.apache.hadoop.ozone.container.common.impl.ContainerLayoutVersion.FILE_PER_BLOCK;
-import static org.apache.hadoop.ozone.container.common.impl.ContainerLayoutVersion.FILE_PER_CHUNK;
 
 import org.junit.After;
+
+import static org.apache.hadoop.ozone.container.common.ContainerTestUtils.createDbInstancesForTestIfNeeded;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
@@ -77,27 +75,30 @@ public class TestKeyValueBlockIterator {
   private File testRoot;
   private DBHandle db;
   private final ContainerLayoutVersion layout;
-
-  public TestKeyValueBlockIterator(ContainerLayoutVersion layout) {
-    this.layout = layout;
+  private String schemaVersion;
+  private String datanodeID = UUID.randomUUID().toString();
+  private String clusterID = UUID.randomUUID().toString();
+
+  public TestKeyValueBlockIterator(ContainerTestVersionInfo versionInfo) {
+    this.layout = versionInfo.getLayout();
+    this.schemaVersion = versionInfo.getSchemaVersion();
+    this.conf = new OzoneConfiguration();
+    ContainerTestVersionInfo.setTestSchemaVersion(schemaVersion, conf);
   }
 
   @Parameterized.Parameters
-  public static Collection<Object[]> data() {
-    return Arrays.asList(new Object[][] {
-        {FILE_PER_CHUNK},
-        {FILE_PER_BLOCK}
-    });
+  public static Iterable<Object[]> data() {
+    return ContainerTestVersionInfo.versionParameters();
   }
 
   @Before
   public void setUp() throws Exception {
     testRoot = GenericTestUtils.getRandomizedTestDir();
-    conf = new OzoneConfiguration();
     conf.set(HDDS_DATANODE_DIR_KEY, testRoot.getAbsolutePath());
     conf.set(OzoneConfigKeys.OZONE_METADATA_DIRS, testRoot.getAbsolutePath());
-    volumeSet = new MutableVolumeSet(UUID.randomUUID().toString(), conf, null,
+    volumeSet = new MutableVolumeSet(datanodeID, clusterID, conf, null,
         StorageVolume.VolumeType.DATA_VOLUME, null);
+    createDbInstancesForTestIfNeeded(volumeSet, clusterID, clusterID, conf);
 
     containerData = new KeyValueContainerData(105L,
             layout,
@@ -105,8 +106,8 @@ public class TestKeyValueBlockIterator {
             UUID.randomUUID().toString());
     // Init the container.
     container = new KeyValueContainer(containerData, conf);
-    container.create(volumeSet, new RoundRobinVolumeChoosingPolicy(), UUID
-            .randomUUID().toString());
+    container.create(volumeSet, new RoundRobinVolumeChoosingPolicy(),
+        clusterID);
     db = BlockUtils.getDB(containerData, conf);
   }
 
@@ -115,6 +116,7 @@ public class TestKeyValueBlockIterator {
   public void tearDown() throws Exception {
     db.close();
     db.cleanup();
+    BlockUtils.shutdownCache(conf);
     volumeSet.shutdown();
     FileUtil.fullyDelete(testRoot);
   }
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueContainer.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueContainer.java
index 5be7627a12..669401896b 100644
--- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueContainer.java
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueContainer.java
@@ -85,6 +85,7 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.fail;
+import static org.junit.Assume.assumeFalse;
 import static org.mockito.ArgumentMatchers.anyList;
 import static org.mockito.ArgumentMatchers.anyLong;
 import static org.mockito.Mockito.mock;
@@ -106,19 +107,22 @@ public class TestKeyValueContainer {
   private UUID datanodeId;
 
   private final ContainerLayoutVersion layout;
+  private String schemaVersion;
 
   // Use one configuration object across parameterized runs of tests.
   // This preserves the column family options in the container options
   // cache for testContainersShareColumnFamilyOptions.
   private static final OzoneConfiguration CONF = new OzoneConfiguration();
 
-  public TestKeyValueContainer(ContainerLayoutVersion layout) {
-    this.layout = layout;
+  public TestKeyValueContainer(ContainerTestVersionInfo versionInfo) {
+    this.layout = versionInfo.getLayout();
+    this.schemaVersion = versionInfo.getSchemaVersion();
+    ContainerTestVersionInfo.setTestSchemaVersion(schemaVersion, CONF);
   }
 
   @Parameterized.Parameters
   public static Iterable<Object[]> parameters() {
-    return ContainerLayoutTestInfo.containerLayoutParameters();
+    return ContainerTestVersionInfo.versionParameters();
   }
 
   @Before
@@ -127,6 +131,7 @@ public class TestKeyValueContainer {
     HddsVolume hddsVolume = new HddsVolume.Builder(folder.getRoot()
         .getAbsolutePath()).conf(CONF).datanodeUuid(datanodeId
         .toString()).build();
+    StorageVolumeUtil.checkVolume(hddsVolume, scmId, scmId, CONF, null, null);
 
     volumeSet = mock(MutableVolumeSet.class);
     volumeChoosingPolicy = mock(RoundRobinVolumeChoosingPolicy.class);
@@ -180,6 +185,8 @@ public class TestKeyValueContainer {
 
   @Test
   public void testEmptyContainerImportExport() throws Exception {
+    assumeFalse(schemaVersion.equals(OzoneConsts.SCHEMA_V3));
+
     createContainer();
     closeContainer();
 
@@ -209,6 +216,8 @@ public class TestKeyValueContainer {
 
   @Test
   public void testContainerImportExport() throws Exception {
+    assumeFalse(schemaVersion.equals(OzoneConsts.SCHEMA_V3));
+
     long containerId = keyValueContainer.getContainerData().getContainerID();
     createContainer();
     long numberOfKeysToWrite = 12;
@@ -235,13 +244,13 @@ public class TestKeyValueContainer {
             keyValueContainerData.getLayoutVersion(),
             keyValueContainerData.getMaxSize(), UUID.randomUUID().toString(),
             datanodeId.toString());
+    containerData.setSchemaVersion(keyValueContainerData.getSchemaVersion());
     KeyValueContainer container = new KeyValueContainer(containerData, CONF);
 
     HddsVolume containerVolume = volumeChoosingPolicy.chooseVolume(
         StorageVolumeUtil.getHddsVolumesList(volumeSet.getVolumesList()), 1);
-    String hddsVolumeDir = containerVolume.getHddsRootDir().toString();
 
-    container.populatePathFields(scmId, containerVolume, hddsVolumeDir);
+    container.populatePathFields(scmId, containerVolume);
     try (FileInputStream fis = new FileInputStream(folderToExport)) {
       container.importContainerData(fis, packer);
     }
@@ -277,12 +286,12 @@ public class TestKeyValueContainer {
             keyValueContainerData.getLayoutVersion(),
             keyValueContainerData.getMaxSize(), UUID.randomUUID().toString(),
             datanodeId.toString());
+    containerData.setSchemaVersion(keyValueContainerData.getSchemaVersion());
     container = new KeyValueContainer(containerData, CONF);
 
     containerVolume = volumeChoosingPolicy.chooseVolume(
         StorageVolumeUtil.getHddsVolumesList(volumeSet.getVolumesList()), 1);
-    hddsVolumeDir = containerVolume.getHddsRootDir().toString();
-    container.populatePathFields(scmId, containerVolume, hddsVolumeDir);
+    container.populatePathFields(scmId, containerVolume);
     try {
       FileInputStream fis = new FileInputStream(folderToExport);
       fis.close();
@@ -352,6 +361,8 @@ public class TestKeyValueContainer {
 
   @Test
   public void concurrentExport() throws Exception {
+    assumeFalse(schemaVersion.equals(OzoneConsts.SCHEMA_V3));
+
     createContainer();
     populate(100);
     closeContainer();
@@ -427,8 +438,13 @@ public class TestKeyValueContainer {
 
     assertFalse("Container File still exists",
         keyValueContainer.getContainerFile().exists());
-    assertFalse("Container DB file still exists",
-        keyValueContainer.getContainerDBFile().exists());
+
+    if (schemaVersion.equals(OzoneConsts.SCHEMA_V3)) {
+      assertTrue(keyValueContainer.getContainerDBFile().exists());
+    } else {
+      assertFalse("Container DB file still exists",
+          keyValueContainer.getContainerDBFile().exists());
+    }
   }
 
   @Test
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueContainerCheck.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueContainerCheck.java
index a37684f2e6..b5694d7abb 100644
--- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueContainerCheck.java
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueContainerCheck.java
@@ -47,9 +47,8 @@ import static org.junit.Assert.assertFalse;
 public class TestKeyValueContainerCheck
     extends TestKeyValueContainerIntegrityChecks {
 
-  public TestKeyValueContainerCheck(ContainerLayoutTestInfo
-      containerLayoutTestInfo) {
-    super(containerLayoutTestInfo);
+  public TestKeyValueContainerCheck(ContainerTestVersionInfo versionInfo) {
+    super(versionInfo);
   }
 
   /**
@@ -71,7 +70,7 @@ public class TestKeyValueContainerCheck
 
     KeyValueContainerCheck kvCheck =
         new KeyValueContainerCheck(containerData.getMetadataPath(), conf,
-            containerID);
+            containerID, containerData.getVolume());
 
     // first run checks on a Open Container
     boolean valid = kvCheck.fastCheck();
@@ -106,11 +105,10 @@ public class TestKeyValueContainerCheck
 
     KeyValueContainerCheck kvCheck =
         new KeyValueContainerCheck(containerData.getMetadataPath(), conf,
-            containerID);
+            containerID, containerData.getVolume());
 
-    File metaDir = new File(containerData.getMetadataPath());
     File dbFile = KeyValueContainerLocationUtil
-        .getContainerDBFile(metaDir, containerID);
+        .getContainerDBFile(containerData);
     containerData.setDbFile(dbFile);
     try (DBHandle ignored = BlockUtils.getDB(containerData, conf);
         BlockIterator<BlockData> kvIter =
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueContainerIntegrityChecks.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueContainerIntegrityChecks.java
index c431903d4b..51e72839fa 100644
--- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueContainerIntegrityChecks.java
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueContainerIntegrityChecks.java
@@ -43,15 +43,14 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.File;
-import java.util.Arrays;
 import java.util.ArrayList;
 import java.nio.ByteBuffer;
-import java.util.Collection;
 import java.util.List;
 import java.util.UUID;
 
 import static java.nio.charset.StandardCharsets.UTF_8;
 import static org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_DATANODE_DIR_KEY;
+import static org.apache.hadoop.ozone.container.common.ContainerTestUtils.createDbInstancesForTestIfNeeded;
 import static org.junit.Assert.assertNotNull;
 
 /**
@@ -67,36 +66,44 @@ public class TestKeyValueContainerIntegrityChecks {
   private OzoneConfiguration conf;
   private File testRoot;
   private ChunkManager chunkManager;
+  private String datanodeID = UUID.randomUUID().toString();
+  private String clusterID = UUID.randomUUID().toString();
 
   protected static final int UNIT_LEN = 1024;
   protected static final int CHUNK_LEN = 3 * UNIT_LEN;
   protected static final int CHUNKS_PER_BLOCK = 4;
 
-  public TestKeyValueContainerIntegrityChecks(ContainerLayoutTestInfo
-      containerLayoutTestInfo) {
-    this.containerLayoutTestInfo = containerLayoutTestInfo;
+  public TestKeyValueContainerIntegrityChecks(
+      ContainerTestVersionInfo versionInfo) {
+    this.conf = new OzoneConfiguration();
+    ContainerTestVersionInfo.setTestSchemaVersion(
+        versionInfo.getSchemaVersion(), conf);
+    if (versionInfo.getLayout()
+        .equals(ContainerLayoutVersion.FILE_PER_BLOCK)) {
+      containerLayoutTestInfo = ContainerLayoutTestInfo.FILE_PER_BLOCK;
+    } else {
+      containerLayoutTestInfo = ContainerLayoutTestInfo.FILE_PER_CHUNK;
+    }
   }
 
-  @Parameterized.Parameters public static Collection<Object[]> data() {
-    return Arrays.asList(new Object[][] {
-        {ContainerLayoutTestInfo.FILE_PER_CHUNK},
-        {ContainerLayoutTestInfo.FILE_PER_BLOCK}
-    });
+  @Parameterized.Parameters public static Iterable<Object[]> data() {
+    return ContainerTestVersionInfo.versionParameters();
   }
 
   @Before public void setUp() throws Exception {
     LOG.info("Testing  layout:{}", containerLayoutTestInfo.getLayout());
     this.testRoot = GenericTestUtils.getRandomizedTestDir();
-    conf = new OzoneConfiguration();
     conf.set(HDDS_DATANODE_DIR_KEY, testRoot.getAbsolutePath());
     conf.set(OzoneConfigKeys.OZONE_METADATA_DIRS, testRoot.getAbsolutePath());
     containerLayoutTestInfo.updateConfig(conf);
-    volumeSet = new MutableVolumeSet(UUID.randomUUID().toString(), conf, null,
-        StorageVolume.VolumeType.DATA_VOLUME, null);
+    volumeSet = new MutableVolumeSet(UUID.randomUUID().toString(), clusterID,
+        conf, null, StorageVolume.VolumeType.DATA_VOLUME, null);
+    createDbInstancesForTestIfNeeded(volumeSet, clusterID, clusterID, conf);
     chunkManager = containerLayoutTestInfo.createChunkManager(true, null);
   }
 
   @After public void teardown() {
+    BlockUtils.shutdownCache(conf);
     volumeSet.shutdown();
     FileUtil.fullyDelete(testRoot);
   }
@@ -138,7 +145,7 @@ public class TestKeyValueContainerIntegrityChecks {
         UUID.randomUUID().toString(), UUID.randomUUID().toString());
     KeyValueContainer container = new KeyValueContainer(containerData, conf);
     container.create(volumeSet, new RoundRobinVolumeChoosingPolicy(),
-        UUID.randomUUID().toString());
+        clusterID);
     try (DBHandle metadataStore = BlockUtils.getDB(containerData,
         conf)) {
       assertNotNull(containerData.getChunksPath());
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueContainerMetadataInspector.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueContainerMetadataInspector.java
index a2a6d6bac7..8195e6f4eb 100644
--- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueContainerMetadataInspector.java
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueContainerMetadataInspector.java
@@ -44,9 +44,9 @@ public class TestKeyValueContainerMetadataInspector
     extends TestKeyValueContainerIntegrityChecks {
   private static final long CONTAINER_ID = 102;
 
-  public TestKeyValueContainerMetadataInspector(ContainerLayoutTestInfo
-      containerLayoutTestInfo) {
-    super(containerLayoutTestInfo);
+  public TestKeyValueContainerMetadataInspector(
+      ContainerTestVersionInfo versionInfo) {
+    super(versionInfo);
   }
 
   @Test
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestTarContainerPacker.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestTarContainerPacker.java
index 793aea5122..c4fe8d9574 100644
--- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestTarContainerPacker.java
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestTarContainerPacker.java
@@ -36,6 +36,7 @@ import org.apache.commons.compress.archivers.ArchiveOutputStream;
 import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream;
 import org.apache.commons.compress.compressors.CompressorOutputStream;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.ozone.container.common.impl.ContainerLayoutVersion;
 import org.apache.hadoop.ozone.container.common.interfaces.ContainerPacker;
 
@@ -56,6 +57,7 @@ import org.junit.runners.Parameterized;
 
 import static java.nio.charset.StandardCharsets.UTF_8;
 import static org.apache.commons.compress.compressors.CompressorStreamFactory.GZIP;
+import static org.junit.Assume.assumeFalse;
 
 /**
  * Test the tar/untar for a given container.
@@ -88,14 +90,19 @@ public class TestTarContainerPacker {
   private static final AtomicInteger CONTAINER_ID = new AtomicInteger(1);
 
   private final ContainerLayoutVersion layout;
-
-  public TestTarContainerPacker(ContainerLayoutVersion layout) {
-    this.layout = layout;
+  private final String schemaVersion;
+  private OzoneConfiguration conf;
+
+  public TestTarContainerPacker(ContainerTestVersionInfo versionInfo) {
+    this.layout = versionInfo.getLayout();
+    this.schemaVersion = versionInfo.getSchemaVersion();
+    this.conf = new OzoneConfiguration();
+    ContainerTestVersionInfo.setTestSchemaVersion(schemaVersion, conf);
   }
 
   @Parameterized.Parameters
   public static Iterable<Object[]> parameters() {
-    return ContainerLayoutTestInfo.containerLayoutParameters();
+    return ContainerTestVersionInfo.versionParameters();
   }
 
   @BeforeClass
@@ -140,10 +147,9 @@ public class TestTarContainerPacker {
 
   @Test
   public void pack() throws IOException, CompressorException {
+    assumeFalse(schemaVersion.equals(OzoneConsts.SCHEMA_V3));
 
     //GIVEN
-    OzoneConfiguration conf = new OzoneConfiguration();
-
     KeyValueContainerData sourceContainerData =
         createContainer(SOURCE_CONTAINER_ROOT);
 
@@ -226,6 +232,8 @@ public class TestTarContainerPacker {
   @Test
   public void unpackContainerDataWithValidRelativeDbFilePath()
       throws Exception {
+    assumeFalse(schemaVersion.equals(OzoneConsts.SCHEMA_V3));
+
     //GIVEN
     KeyValueContainerData sourceContainerData =
         createContainer(SOURCE_CONTAINER_ROOT);
@@ -246,6 +254,8 @@ public class TestTarContainerPacker {
   @Test
   public void unpackContainerDataWithValidRelativeChunkFilePath()
       throws Exception {
+    assumeFalse(schemaVersion.equals(OzoneConsts.SCHEMA_V3));
+
     //GIVEN
     KeyValueContainerData sourceContainerData =
         createContainer(SOURCE_CONTAINER_ROOT);
@@ -266,6 +276,8 @@ public class TestTarContainerPacker {
   @Test
   public void unpackContainerDataWithInvalidRelativeDbFilePath()
       throws Exception {
+    assumeFalse(schemaVersion.equals(OzoneConsts.SCHEMA_V3));
+
     //GIVEN
     KeyValueContainerData sourceContainerData =
         createContainer(SOURCE_CONTAINER_ROOT);
@@ -283,6 +295,8 @@ public class TestTarContainerPacker {
   @Test
   public void unpackContainerDataWithInvalidRelativeChunkFilePath()
       throws Exception {
+    assumeFalse(schemaVersion.equals(OzoneConsts.SCHEMA_V3));
+
     //GIVEN
     KeyValueContainerData sourceContainerData =
         createContainer(SOURCE_CONTAINER_ROOT);
@@ -300,7 +314,6 @@ public class TestTarContainerPacker {
   private KeyValueContainerData unpackContainerData(File containerFile)
       throws IOException {
     try (FileInputStream input = new FileInputStream(containerFile)) {
-      OzoneConfiguration conf = new OzoneConfiguration();
       KeyValueContainerData data = createContainer(DEST_CONTAINER_ROOT);
       KeyValueContainer container = new KeyValueContainer(data, conf);
       packer.unpackContainerData(container, input);
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/impl/TestBlockManagerImpl.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/impl/TestBlockManagerImpl.java
index 5773eb38c8..377c7804bb 100644
--- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/impl/TestBlockManagerImpl.java
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/impl/TestBlockManagerImpl.java
@@ -26,13 +26,16 @@ import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.ozone.container.common.helpers.BlockData;
 import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo;
 import org.apache.hadoop.ozone.container.common.impl.ContainerLayoutVersion;
+import org.apache.hadoop.ozone.container.common.utils.StorageVolumeUtil;
 import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
 import org.apache.hadoop.ozone.container.common.volume.RoundRobinVolumeChoosingPolicy;
 import org.apache.hadoop.ozone.container.common.volume.VolumeSet;
 import org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet;
-import org.apache.hadoop.ozone.container.keyvalue.ContainerLayoutTestInfo;
+import org.apache.hadoop.ozone.container.keyvalue.ContainerTestVersionInfo;
 import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainer;
 import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
+import org.apache.hadoop.ozone.container.keyvalue.helpers.BlockUtils;
+import org.junit.After;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
@@ -73,23 +76,28 @@ public class TestBlockManagerImpl {
   private BlockID blockID1;
 
   private final ContainerLayoutVersion layout;
+  private final String schemaVersion;
 
-  public TestBlockManagerImpl(ContainerLayoutVersion layout) {
-    this.layout = layout;
+  public TestBlockManagerImpl(ContainerTestVersionInfo versionInfo) {
+    this.layout = versionInfo.getLayout();
+    this.schemaVersion = versionInfo.getSchemaVersion();
+    this.config = new OzoneConfiguration();
+    ContainerTestVersionInfo.setTestSchemaVersion(schemaVersion, config);
   }
 
   @Parameterized.Parameters
   public static Iterable<Object[]> parameters() {
-    return ContainerLayoutTestInfo.containerLayoutParameters();
+    return ContainerTestVersionInfo.versionParameters();
   }
 
   @Before
   public void setUp() throws Exception {
-    config = new OzoneConfiguration();
     UUID datanodeId = UUID.randomUUID();
     HddsVolume hddsVolume = new HddsVolume.Builder(folder.getRoot()
         .getAbsolutePath()).conf(config).datanodeUuid(datanodeId
         .toString()).build();
+    StorageVolumeUtil.checkVolume(hddsVolume, scmId, scmId, config,
+        null, null);
 
     volumeSet = mock(MutableVolumeSet.class);
 
@@ -137,6 +145,11 @@ public class TestBlockManagerImpl {
 
   }
 
+  @After
+  public void cleanup() {
+    BlockUtils.shutdownCache(config);
+  }
+
   @Test
   public void testPutBlock() throws Exception {
     assertEquals(0, keyValueContainer.getContainerData().getBlockCount());
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestContainerReader.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestContainerReader.java
index 0c70e79c39..2ab6f95cf6 100644
--- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestContainerReader.java
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestContainerReader.java
@@ -33,18 +33,23 @@ import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
 import org.apache.hadoop.ozone.container.common.interfaces.Container;
 import org.apache.hadoop.ozone.container.common.interfaces.DBHandle;
 import org.apache.hadoop.ozone.container.common.utils.ContainerCache;
+import org.apache.hadoop.ozone.container.common.utils.StorageVolumeUtil;
 import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
 import org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet;
 import org.apache.hadoop.ozone.container.common.volume.RoundRobinVolumeChoosingPolicy;
 import org.apache.hadoop.ozone.container.common.volume.StorageVolume;
+import org.apache.hadoop.ozone.container.keyvalue.ContainerTestVersionInfo;
 import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainer;
 import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
 import org.apache.hadoop.ozone.container.keyvalue.helpers.BlockUtils;
+import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 import org.mockito.Mockito;
 
 import java.io.File;
@@ -52,6 +57,7 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.UUID;
 
+import static org.apache.hadoop.ozone.container.common.ContainerTestUtils.createDbInstancesForTestIfNeeded;
 import static org.mockito.ArgumentMatchers.anyList;
 import static org.mockito.ArgumentMatchers.anyLong;
 import static org.mockito.Mockito.mock;
@@ -59,6 +65,7 @@ import static org.mockito.Mockito.mock;
 /**
  * Test ContainerReader class which loads containers from disks.
  */
+@RunWith(Parameterized.class)
 public class TestContainerReader {
 
   @Rule
@@ -76,18 +83,35 @@ public class TestContainerReader {
   private int blockCount = 10;
   private long blockLen = 1024;
 
+  private final ContainerLayoutVersion layout;
+  private String schemaVersion;
+
+  public TestContainerReader(ContainerTestVersionInfo versionInfo) {
+    this.layout = versionInfo.getLayout();
+    this.schemaVersion = versionInfo.getSchemaVersion();
+    this.conf = new OzoneConfiguration();
+    ContainerTestVersionInfo.setTestSchemaVersion(schemaVersion, conf);
+  }
+
+
+  @Parameterized.Parameters
+  public static Iterable<Object[]> parameters() {
+    return ContainerTestVersionInfo.versionParameters();
+  }
+
   @Before
   public void setup() throws Exception {
 
     File volumeDir = tempDir.newFolder();
     volumeSet = Mockito.mock(MutableVolumeSet.class);
     containerSet = new ContainerSet();
-    conf = new OzoneConfiguration();
 
     datanodeId = UUID.randomUUID();
     hddsVolume = new HddsVolume.Builder(volumeDir
         .getAbsolutePath()).conf(conf).datanodeUuid(datanodeId
         .toString()).clusterID(clusterId).build();
+    StorageVolumeUtil.checkVolume(hddsVolume, clusterId, clusterId, conf,
+        null, null);
 
     volumeSet = mock(MutableVolumeSet.class);
     volumeChoosingPolicy = mock(RoundRobinVolumeChoosingPolicy.class);
@@ -96,7 +120,7 @@ public class TestContainerReader {
 
     for (int i = 0; i < 2; i++) {
       KeyValueContainerData keyValueContainerData = new KeyValueContainerData(i,
-          ContainerLayoutVersion.FILE_PER_BLOCK,
+          layout,
           (long) StorageUnit.GB.toBytes(5), UUID.randomUUID().toString(),
           datanodeId.toString());
 
@@ -114,13 +138,17 @@ public class TestContainerReader {
         blkNames = addBlocks(keyValueContainer, false);
         markBlocksForDelete(keyValueContainer, false, blkNames, i);
       }
-      // Close the RocksDB instance for this container and remove from the cache
-      // so it does not affect the ContainerReader, which avoids using the cache
-      // at startup for performance reasons.
-      BlockUtils.removeDB(keyValueContainerData, conf);
     }
+    // Close the RocksDB instance for this container and remove from the cache
+    // so it does not affect the ContainerReader, which avoids using the cache
+    // at startup for performance reasons.
+    ContainerCache.getInstance(conf).shutdownCache();
   }
 
+  @After
+  public void cleanup() {
+    BlockUtils.shutdownCache(conf);
+  }
 
   private void markBlocksForDelete(KeyValueContainer keyValueContainer,
       boolean setMetaData, List<Long> blockNames, int count) throws Exception {
@@ -228,6 +256,8 @@ public class TestContainerReader {
     hddsVolume1 = new HddsVolume.Builder(volumeDir1
         .getAbsolutePath()).conf(conf).datanodeUuid(datanode
         .toString()).clusterID(clusterId).build();
+    StorageVolumeUtil.checkVolume(hddsVolume1, clusterId, clusterId, conf,
+        null, null);
     volumeChoosingPolicy1 = mock(RoundRobinVolumeChoosingPolicy.class);
     Mockito.when(volumeChoosingPolicy1.chooseVolume(anyList(), anyLong()))
         .thenReturn(hddsVolume1);
@@ -235,13 +265,12 @@ public class TestContainerReader {
     int containerCount = 3;
     for (int i = 0; i < containerCount; i++) {
       KeyValueContainerData keyValueContainerData = new KeyValueContainerData(i,
-          ContainerLayoutVersion.FILE_PER_BLOCK,
+          layout,
           (long) StorageUnit.GB.toBytes(5), UUID.randomUUID().toString(),
           datanodeId.toString());
       KeyValueContainer keyValueContainer =
           new KeyValueContainer(keyValueContainerData, conf);
       keyValueContainer.create(volumeSet1, volumeChoosingPolicy1, clusterId);
-      BlockUtils.removeDB(keyValueContainerData, conf);
 
       if (i == 0) {
         // rename first container directory name
@@ -252,6 +281,7 @@ public class TestContainerReader {
         Assert.assertTrue(containerPath.renameTo(new File(renamePath)));
       }
     }
+    ContainerCache.getInstance(conf).shutdownCache();
 
     ContainerReader containerReader = new ContainerReader(volumeSet1,
         hddsVolume1, containerSet1, conf);
@@ -268,6 +298,8 @@ public class TestContainerReader {
       volumeDirs[i] = tempDir.newFolder();
       datanodeDirs = datanodeDirs.append(volumeDirs[i]).append(",");
     }
+
+    BlockUtils.shutdownCache(conf);
     conf.set(ScmConfigKeys.HDDS_DATANODE_DIR_KEY,
         datanodeDirs.toString());
     conf.set(OzoneConfigKeys.DFS_CONTAINER_RATIS_DATANODE_STORAGE_DIR,
@@ -275,8 +307,9 @@ public class TestContainerReader {
     MutableVolumeSet volumeSets =
         new MutableVolumeSet(datanodeId.toString(), clusterId, conf, null,
             StorageVolume.VolumeType.DATA_VOLUME, null);
+    createDbInstancesForTestIfNeeded(volumeSets, clusterId, clusterId, conf);
     ContainerCache cache = ContainerCache.getInstance(conf);
-    cache.clear();
+    cache.shutdownCache();
 
     RoundRobinVolumeChoosingPolicy policy =
         new RoundRobinVolumeChoosingPolicy();
@@ -285,7 +318,7 @@ public class TestContainerReader {
     blockCount = containerCount;
     for (int i = 0; i < containerCount; i++) {
       KeyValueContainerData keyValueContainerData =
-          new KeyValueContainerData(i, ContainerLayoutVersion.FILE_PER_BLOCK,
+          new KeyValueContainerData(i, layout,
               (long) StorageUnit.GB.toBytes(5), UUID.randomUUID().toString(),
               datanodeId.toString());
 
@@ -302,11 +335,11 @@ public class TestContainerReader {
         blkNames = addBlocks(keyValueContainer, false);
         markBlocksForDelete(keyValueContainer, false, blkNames, i);
       }
-      // Close the RocksDB instance for this container and remove from the cache
-      // so it does not affect the ContainerReader, which avoids using the cache
-      // at startup for performance reasons.
-      BlockUtils.removeDB(keyValueContainerData, conf);
     }
+    // Close the RocksDB instance for this container and remove from the cache
+    // so it does not affect the ContainerReader, which avoids using the cache
+    // at startup for performance reasons.
+    cache.shutdownCache();
 
     List<StorageVolume> volumes = volumeSets.getVolumesList();
     ContainerReader[] containerReaders = new ContainerReader[volumeNum];
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainer.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainer.java
index 92ebe0a7f2..5ae3bad21f 100644
--- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainer.java
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainer.java
@@ -43,7 +43,7 @@ import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
 import org.apache.hadoop.ozone.container.common.volume.RoundRobinVolumeChoosingPolicy;
 import org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet;
 import org.apache.hadoop.ozone.container.common.volume.StorageVolume;
-import org.apache.hadoop.ozone.container.keyvalue.ContainerLayoutTestInfo;
+import org.apache.hadoop.ozone.container.keyvalue.ContainerTestVersionInfo;
 import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainer;
 import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
 import org.apache.hadoop.ozone.container.keyvalue.helpers.BlockUtils;
@@ -68,6 +68,7 @@ import java.util.List;
 import java.util.ArrayList;
 
 import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.DISK_OUT_OF_SPACE;
+import static org.apache.hadoop.ozone.container.common.ContainerTestUtils.createDbInstancesForTestIfNeeded;
 import static org.junit.Assert.assertEquals;
 
 /**
@@ -93,31 +94,37 @@ public class TestOzoneContainer {
   private final int numTestContainers = 10;
 
   private final ContainerLayoutVersion layout;
+  private final String schemaVersion;
 
-  public TestOzoneContainer(ContainerLayoutVersion layout) {
-    this.layout = layout;
+  public TestOzoneContainer(ContainerTestVersionInfo versionInfo) {
+    this.layout = versionInfo.getLayout();
+    this.schemaVersion = versionInfo.getSchemaVersion();
+    this.conf = new OzoneConfiguration();
+    ContainerTestVersionInfo.setTestSchemaVersion(schemaVersion, conf);
   }
 
   @Parameterized.Parameters
   public static Iterable<Object[]> parameters() {
-    return ContainerLayoutTestInfo.containerLayoutParameters();
+    return ContainerTestVersionInfo.versionParameters();
   }
 
   @Before
   public void setUp() throws Exception {
-    conf = new OzoneConfiguration();
     conf.set(ScmConfigKeys.HDDS_DATANODE_DIR_KEY, folder.getRoot()
         .getAbsolutePath());
     conf.set(HddsConfigKeys.OZONE_METADATA_DIRS,
         folder.newFolder().getAbsolutePath());
     commitSpaceMap = new HashMap<String, Long>();
-    volumeSet = new MutableVolumeSet(datanodeDetails.getUuidString(), conf,
-        null, StorageVolume.VolumeType.DATA_VOLUME, null);
+    volumeSet = new MutableVolumeSet(datanodeDetails.getUuidString(),
+        clusterId, conf, null, StorageVolume.VolumeType.DATA_VOLUME, null);
+    createDbInstancesForTestIfNeeded(volumeSet, clusterId, clusterId, conf);
     volumeChoosingPolicy = new RoundRobinVolumeChoosingPolicy();
   }
 
   @After
   public void cleanUp() throws Exception {
+    BlockUtils.shutdownCache(conf);
+
     if (volumeSet != null) {
       volumeSet.shutdown();
       volumeSet = null;
@@ -159,8 +166,8 @@ public class TestOzoneContainer {
       Preconditions.checkState(freeBytes >= 0);
       commitSpaceMap.put(getVolumeKey(myVolume),
           Long.valueOf(volCommitBytes + freeBytes));
-      BlockUtils.removeDB(keyValueContainerData, conf);
     }
+    BlockUtils.shutdownCache(conf);
 
     DatanodeStateMachine stateMachine = Mockito.mock(
         DatanodeStateMachine.class);
@@ -244,7 +251,7 @@ public class TestOzoneContainer {
         StorageVolumeUtil.getHddsVolumesList(volumeSet.getVolumesList());
     // Format the volumes
     for (HddsVolume volume : volumes) {
-      volume.format(UUID.randomUUID().toString());
+      volume.format(clusterId);
 
       // eat up all available space except size of 1 container
       volume.incCommittedBytes(volume.getAvailable() - containerSize);
diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBTable.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBTable.java
index c011cfe6a6..81b8872a82 100644
--- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBTable.java
+++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBTable.java
@@ -260,6 +260,16 @@ class RDBTable implements Table<byte[], byte[]> {
     return getRangeKVs(startKey, count, true, prefix, filters);
   }
 
+  @Override
+  public void deleteBatchWithPrefix(BatchOperation batch, byte[] prefix)
+      throws IOException {
+    try (TableIterator<byte[], ByteArrayKeyValue> iter = iterator(prefix)) {
+      while (iter.hasNext()) {
+        deleteWithBatch(batch, iter.next().getValue());
+      }
+    }
+  }
+
   private List<ByteArrayKeyValue> getRangeKVs(byte[] startKey,
       int count, boolean sequential, byte[] prefix,
       MetadataKeyFilters.MetadataKeyFilter... filters)
diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/Table.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/Table.java
index bdaaa966a3..b99e1d24ec 100644
--- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/Table.java
+++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/Table.java
@@ -271,6 +271,16 @@ public interface Table<KEY, VALUE> extends AutoCloseable {
           MetadataKeyFilters.MetadataKeyFilter... filters)
           throws IOException, IllegalArgumentException;
 
+  /**
+   * Deletes all keys with the specified prefix from the metadata store
+   * as part of a batch operation.
+   * @param batch
+   * @param prefix
+   * @return
+   */
+  void deleteBatchWithPrefix(BatchOperation batch, KEY prefix)
+      throws IOException;
+
   /**
    * Class used to represent the key and value pair of a db entry.
    */
diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/TypedTable.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/TypedTable.java
index 3316536cc7..6469bfe911 100644
--- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/TypedTable.java
+++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/TypedTable.java
@@ -374,6 +374,12 @@ public class TypedTable<KEY, VALUE> implements Table<KEY, VALUE> {
     return rangeKVs;
   }
 
+  @Override
+  public void deleteBatchWithPrefix(BatchOperation batch, KEY prefix)
+      throws IOException {
+    rawTable.deleteBatchWithPrefix(batch, codecRegistry.asRawData(prefix));
+  }
+
   @Override
   public void cleanupCache(List<Long> epochs) {
     cache.cleanup(epochs);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org