You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by sh...@apache.org on 2020/03/31 05:57:03 UTC

[hadoop-ozone] branch master updated: HDDS-2943. Parameterize unit tests for chunk manager implementation (#694)

This is an automated email from the ASF dual-hosted git repository.

shashikant pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hadoop-ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new 33ac261  HDDS-2943. Parameterize unit tests for chunk manager implementation (#694)
33ac261 is described below

commit 33ac261803fbd7d19d3acca58d1ee6e7b0888db1
Author: Doroszlai, Attila <64...@users.noreply.github.com>
AuthorDate: Tue Mar 31 07:56:55 2020 +0200

    HDDS-2943. Parameterize unit tests for chunk manager implementation (#694)
---
 .../container/common/impl/ChunkLayOutVersion.java  |  53 +++-
 .../ozone/container/common/impl/ContainerData.java |  19 ++
 .../container/keyvalue/KeyValueContainerCheck.java | 127 ++++----
 .../container/keyvalue/KeyValueContainerData.java  |  19 --
 .../container/keyvalue/helpers/ChunkUtils.java     |  39 +--
 .../keyvalue/impl/FilePerBlockStrategy.java        |  23 +-
 .../keyvalue/impl/FilePerChunkStrategy.java        |  25 +-
 .../keyvalue/interfaces/ChunkManager.java          |   1 -
 .../ozone/container/common/ContainerTestUtils.java |   3 +-
 .../container/common/TestBlockDeletingService.java |  17 +-
 .../common/TestKeyValueContainerData.java          |  18 +-
 .../common/impl/TestContainerDataYaml.java         |  21 +-
 .../impl/TestContainerDeletionChoosingPolicy.java  |  21 +-
 .../common/impl/TestContainerPersistence.java      |  98 +------
 .../container/common/impl/TestContainerSet.java    |  23 +-
 .../container/common/impl/TestHddsDispatcher.java  |  19 +-
 .../TestCloseContainerCommandHandler.java          |  21 +-
 .../common/volume/TestHddsVolumeChecker.java       |  40 ++-
 .../container/keyvalue/ChunkLayoutTestInfo.java    | 120 ++++++++
 .../keyvalue/TestKeyValueBlockIterator.java        |  15 +-
 .../container/keyvalue/TestKeyValueContainer.java  |  23 +-
 .../keyvalue/TestKeyValueContainerCheck.java       |  76 ++---
 .../TestKeyValueContainerMarkUnhealthy.java        |  16 +-
 .../container/keyvalue/TestKeyValueHandler.java    |  49 ++--
 .../container/keyvalue/TestTarContainerPacker.java |  18 +-
 .../keyvalue/impl/AbstractTestChunkManager.java    | 170 +++++++++++
 .../keyvalue/impl/CommonChunkManagerTestCases.java | 201 +++++++++++++
 .../keyvalue/impl/TestBlockManagerImpl.java        |  17 +-
 .../container/keyvalue/impl/TestChunkManager.java  | 325 ---------------------
 .../keyvalue/impl/TestChunkManagerDummyImpl.java   |  60 ++++
 .../keyvalue/impl/TestFilePerBlockStrategy.java    | 140 +++++++++
 .../keyvalue/impl/TestFilePerChunkStrategy.java    |  92 ++++++
 .../container/ozoneimpl/TestOzoneContainer.java    |  19 +-
 .../replication/TestReplicationSupervisor.java     |  32 +-
 34 files changed, 1307 insertions(+), 653 deletions(-)

diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ChunkLayOutVersion.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ChunkLayOutVersion.java
index 97e4f69..a5bcc22 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ChunkLayOutVersion.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ChunkLayOutVersion.java
@@ -18,19 +18,45 @@
 package org.apache.hadoop.ozone.container.common.impl;
 
 
+import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdds.client.BlockID;
 import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
+import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+import java.io.File;
 import java.util.List;
 
+import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.UNABLE_TO_FIND_DATA_DIR;
+
 /**
  * Defines layout versions for the Chunks.
  */
 public enum ChunkLayOutVersion {
 
-  FILE_PER_CHUNK(1, "One file per chunk"),
-  FILE_PER_BLOCK(2, "One file per block");
+  FILE_PER_CHUNK(1, "One file per chunk") {
+    @Override
+    public File getChunkFile(ContainerData containerData, BlockID blockID,
+        ChunkInfo info) throws StorageContainerException {
+      File chunksLoc = verifyChunkDirExists(containerData);
+      return chunksLoc.toPath().resolve(info.getChunkName()).toFile();
+    }
+  },
+  FILE_PER_BLOCK(2, "One file per block") {
+    @Override
+    public File getChunkFile(ContainerData containerData, BlockID blockID,
+        ChunkInfo info) throws StorageContainerException {
+      File chunkDir = verifyChunkDirExists(containerData);
+      return new File(chunkDir, blockID.getLocalID() + ".block");
+    }
+  };
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(ChunkLayOutVersion.class);
 
   private static final ChunkLayOutVersion
       DEFAULT_LAYOUT = ChunkLayOutVersion.FILE_PER_BLOCK;
@@ -91,8 +117,31 @@ public enum ChunkLayOutVersion {
     return description;
   }
 
+  public abstract File getChunkFile(ContainerData containerData,
+      BlockID blockID, ChunkInfo info) throws StorageContainerException;
+
   @Override
   public String toString() {
     return "ChunkLayout:v" + version;
   }
+
+  private static File verifyChunkDirExists(ContainerData containerData)
+      throws StorageContainerException {
+    Preconditions.checkNotNull(containerData, "Container data can't be null");
+
+    String chunksPath = containerData.getChunksPath();
+    if (chunksPath == null) {
+      LOG.error("Chunks path is null in the container data");
+      throw new StorageContainerException("Unable to get Chunks directory.",
+          UNABLE_TO_FIND_DATA_DIR);
+    }
+    File chunksLoc = new File(chunksPath);
+    if (!chunksLoc.exists()) {
+      LOG.error("Chunks path does not exist");
+      throw new StorageContainerException("Unable to get Chunks directory.",
+          UNABLE_TO_FIND_DATA_DIR);
+    }
+    return chunksLoc;
+  }
+
 }
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerData.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerData.java
index 0ad4b71..00627ff 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerData.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerData.java
@@ -72,6 +72,9 @@ public abstract class ContainerData {
   // This can hold information like volume name, owner etc.,
   private final Map<String, String> metadata;
 
+  // Path to Physical file system where chunks are stored.
+  private String chunksPath;
+
   // State of the Container
   private ContainerDataProto.State state;
 
@@ -228,6 +231,22 @@ public abstract class ContainerData {
   }
 
   /**
+   * Get chunks path.
+   * @return - Path where chunks are stored
+   */
+  public String getChunksPath() {
+    return chunksPath;
+  }
+
+  /**
+   * Set chunks Path.
+   * @param chunkPath - File path.
+   */
+  public void setChunksPath(String chunkPath) {
+    this.chunksPath = chunkPath;
+  }
+
+  /**
    * Add/Update metadata.
    * We should hold the container lock before updating the metadata as this
    * will be persisted on disk. Unless, we are reconstructing ContainerData
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainerCheck.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainerCheck.java
index b8f5f11..1e53daa 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainerCheck.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainerCheck.java
@@ -30,17 +30,17 @@ import org.apache.hadoop.ozone.common.OzoneChecksumException;
 import org.apache.hadoop.ozone.container.common.helpers.BlockData;
 import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo;
 import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
+import org.apache.hadoop.ozone.container.common.impl.ChunkLayOutVersion;
 import org.apache.hadoop.ozone.container.common.impl.ContainerDataYaml;
-import org.apache.hadoop.ozone.container.common.interfaces.Container;
 import org.apache.hadoop.ozone.container.keyvalue.helpers.BlockUtils;
 import org.apache.hadoop.ozone.container.keyvalue.helpers.ChunkUtils;
 import org.apache.hadoop.ozone.container.keyvalue.helpers.KeyValueContainerLocationUtil;
 import org.apache.hadoop.ozone.container.common.utils.ReferenceCountedDB;
 
 import java.io.File;
-import java.io.FileInputStream;
 import java.io.IOException;
-import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
 import java.util.Arrays;
 
 import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
@@ -57,7 +57,8 @@ import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_METADATA_STORE_IMPL_
 
 public class KeyValueContainerCheck {
 
-  private static final Logger LOG = LoggerFactory.getLogger(Container.class);
+  private static final Logger LOG =
+      LoggerFactory.getLogger(KeyValueContainerCheck.class);
 
   private long containerID;
   private KeyValueContainerData onDiskContainerData; //loaded from fs/disk
@@ -213,10 +214,9 @@ public class KeyValueContainerCheck {
      */
     Preconditions.checkState(onDiskContainerData != null,
         "invoke loadContainerData prior to calling this function");
-    File dbFile;
-    File metaDir = new File(metadataPath);
 
-    dbFile = KeyValueContainerLocationUtil
+    File metaDir = new File(metadataPath);
+    File dbFile = KeyValueContainerLocationUtil
         .getContainerDBFile(metaDir, containerID);
 
     if (!dbFile.exists() || !dbFile.canRead()) {
@@ -227,6 +227,9 @@ public class KeyValueContainerCheck {
     }
 
     onDiskContainerData.setDbFile(dbFile);
+
+    ChunkLayOutVersion layout = onDiskContainerData.getLayOutVersion();
+
     try(ReferenceCountedDB db =
             BlockUtils.getDB(onDiskContainerData, checkConfig);
         KeyValueBlockIterator kvIter = new KeyValueBlockIterator(containerID,
@@ -235,8 +238,9 @@ public class KeyValueContainerCheck {
       while(kvIter.hasNext()) {
         BlockData block = kvIter.nextBlock();
         for(ContainerProtos.ChunkInfo chunk : block.getChunks()) {
-          File chunkFile = ChunkUtils.getChunkFile(onDiskContainerData,
-              ChunkInfo.getFromProtoBuf(chunk));
+          File chunkFile = layout.getChunkFile(onDiskContainerData,
+              block.getBlockID(), ChunkInfo.getFromProtoBuf(chunk));
+
           if (!chunkFile.exists()) {
             // concurrent mutation in Block DB? lookup the block again.
             byte[] bdata = db.getStore().get(
@@ -246,52 +250,77 @@ public class KeyValueContainerCheck {
                   + chunkFile.getAbsolutePath());
             }
           } else if (chunk.getChecksumData().getType()
-              != ContainerProtos.ChecksumType.NONE){
-            int length = chunk.getChecksumData().getChecksumsList().size();
-            ChecksumData cData = new ChecksumData(
-                chunk.getChecksumData().getType(),
-                chunk.getChecksumData().getBytesPerChecksum(),
-                chunk.getChecksumData().getChecksumsList());
-            Checksum cal = new Checksum(cData.getChecksumType(),
-                cData.getBytesPerChecksum());
-            long bytesRead = 0;
-            byte[] buffer = new byte[cData.getBytesPerChecksum()];
-            try (InputStream fs = new FileInputStream(chunkFile)) {
-              for (int i = 0; i < length; i++) {
-                int v = fs.read(buffer);
-                if (v == -1) {
-                  break;
-                }
-                bytesRead += v;
-                throttler.throttle(v, canceler);
-                ByteString expected = cData.getChecksums().get(i);
-                ByteString actual = cal.computeChecksum(buffer, 0, v)
-                    .getChecksums().get(0);
-                if (!expected.equals(actual)) {
-                  throw new OzoneChecksumException(String
-                      .format("Inconsistent read for chunk=%s len=%d expected" +
-                              " checksum %s actual checksum %s for block %s",
-                          chunk.getChunkName(), chunk.getLen(),
-                          Arrays.toString(expected.toByteArray()),
-                          Arrays.toString(actual.toByteArray()),
-                          block.getBlockID()));
-                }
-
-              }
-              if (bytesRead != chunk.getLen()) {
-                throw new OzoneChecksumException(String
-                    .format("Inconsistent read for chunk=%s expected length=%d"
-                            + " actual length=%d for block %s",
-                        chunk.getChunkName(),
-                        chunk.getLen(), bytesRead, block.getBlockID()));
-              }
-            }
+              != ContainerProtos.ChecksumType.NONE) {
+            verifyChecksum(block, chunk, chunkFile, layout, throttler,
+                canceler);
           }
         }
       }
     }
   }
 
+  private static void verifyChecksum(BlockData block,
+      ContainerProtos.ChunkInfo chunk, File chunkFile,
+      ChunkLayOutVersion layout,
+      DataTransferThrottler throttler, Canceler canceler) throws IOException {
+    ChecksumData checksumData =
+        ChecksumData.getFromProtoBuf(chunk.getChecksumData());
+    int checksumCount = checksumData.getChecksums().size();
+    int bytesPerChecksum = checksumData.getBytesPerChecksum();
+    Checksum cal = new Checksum(checksumData.getChecksumType(),
+        bytesPerChecksum);
+    ByteBuffer buffer = ByteBuffer.allocate(bytesPerChecksum);
+    long bytesRead = 0;
+    try (FileChannel channel = FileChannel.open(chunkFile.toPath(),
+        ChunkUtils.READ_OPTIONS, ChunkUtils.NO_ATTRIBUTES)) {
+      if (layout == ChunkLayOutVersion.FILE_PER_BLOCK) {
+        channel.position(chunk.getOffset());
+      }
+      for (int i = 0; i < checksumCount; i++) {
+        // limit last read for FILE_PER_BLOCK, to avoid reading next chunk
+        if (layout == ChunkLayOutVersion.FILE_PER_BLOCK &&
+            i == checksumCount - 1 &&
+            chunk.getLen() % bytesPerChecksum != 0) {
+          buffer.limit((int) (chunk.getLen() % bytesPerChecksum));
+        }
+
+        int v = channel.read(buffer);
+        if (v == -1) {
+          break;
+        }
+        bytesRead += v;
+        buffer.flip();
+
+        throttler.throttle(v, canceler);
+
+        ByteString expected = checksumData.getChecksums().get(i);
+        ByteString actual = cal.computeChecksum(buffer)
+            .getChecksums().get(0);
+        if (!expected.equals(actual)) {
+          throw new OzoneChecksumException(String
+              .format("Inconsistent read for chunk=%s" +
+                  " checksum item %d" +
+                  " expected checksum %s" +
+                  " actual checksum %s" +
+                  " for block %s",
+                  ChunkInfo.getFromProtoBuf(chunk),
+                  i,
+                  Arrays.toString(expected.toByteArray()),
+                  Arrays.toString(actual.toByteArray()),
+                  block.getBlockID()));
+        }
+
+      }
+      if (bytesRead != chunk.getLen()) {
+        throw new OzoneChecksumException(String
+            .format("Inconsistent read for chunk=%s expected length=%d"
+                    + " actual length=%d for block %s",
+                chunk.getChunkName(),
+                chunk.getLen(), bytesRead, block.getBlockID()));
+      }
+    }
+  }
+
   private void loadContainerData() throws IOException {
     File containerFile = KeyValueContainer
         .getContainerFile(metadataPath, containerID);
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainerData.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainerData.java
index 88fcb6f..1e373de 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainerData.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainerData.java
@@ -56,9 +56,6 @@ public class KeyValueContainerData extends ContainerData {
   // Path to Container metadata Level DB/RocksDB Store and .container file.
   private String metadataPath;
 
-  // Path to Physical file system where chunks are stored.
-  private String chunksPath;
-
   //Type of DB used to store key to chunks mapping
   private String containerDBType;
 
@@ -165,22 +162,6 @@ public class KeyValueContainerData extends ContainerData {
   }
 
   /**
-   * Get chunks path.
-   * @return - Path where chunks are stored
-   */
-  public String getChunksPath() {
-    return chunksPath;
-  }
-
-  /**
-   * Set chunks Path.
-   * @param chunkPath - File path.
-   */
-  public void setChunksPath(String chunkPath) {
-    this.chunksPath = chunkPath;
-  }
-
-  /**
    * Returns the DBType used for the container.
    * @return containerDBType
    */
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/ChunkUtils.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/ChunkUtils.java
index 249aec3..df0279f 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/ChunkUtils.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/ChunkUtils.java
@@ -43,11 +43,9 @@ import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.ozone.common.ChunkBuffer;
 import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo;
 import org.apache.hadoop.ozone.container.common.volume.VolumeIOStats;
-import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
 import org.apache.hadoop.util.Time;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
 
 import static java.nio.channels.FileChannel.open;
 import static java.util.Collections.unmodifiableSet;
@@ -56,7 +54,6 @@ import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Res
 import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.IO_EXCEPTION;
 import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.NO_SUCH_ALGORITHM;
 import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.UNABLE_TO_FIND_CHUNK;
-import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.UNABLE_TO_FIND_DATA_DIR;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -78,11 +75,11 @@ public final class ChunkUtils {
           StandardOpenOption.WRITE,
           StandardOpenOption.SPARSE
       ));
-  private static final Set<? extends OpenOption> READ_OPTIONS =
+  public static final Set<? extends OpenOption> READ_OPTIONS =
       unmodifiableSet(EnumSet.of(
           StandardOpenOption.READ
       ));
-  private static final FileAttribute<?>[] NO_ATTRIBUTES = {};
+  public static final FileAttribute<?>[] NO_ATTRIBUTES = {};
 
   /** Never constructed. **/
   private ChunkUtils() {
@@ -233,38 +230,6 @@ public final class ChunkUtils {
   }
 
   /**
-   * Validates that Path to chunk file exists.
-   *
-   * @param containerData - Container Data
-   * @param info - Chunk info
-   * @return - File.
-   */
-  public static File getChunkFile(KeyValueContainerData containerData,
-      ChunkInfo info) throws StorageContainerException {
-    File chunksLoc = verifyChunkDirExists(containerData);
-    return chunksLoc.toPath().resolve(info.getChunkName()).toFile();
-  }
-
-  public static File verifyChunkDirExists(KeyValueContainerData containerData)
-      throws StorageContainerException {
-    Preconditions.checkNotNull(containerData, "Container data can't be null");
-
-    String chunksPath = containerData.getChunksPath();
-    if (chunksPath == null) {
-      LOG.error("Chunks path is null in the container data");
-      throw new StorageContainerException("Unable to get Chunks directory.",
-          UNABLE_TO_FIND_DATA_DIR);
-    }
-    File chunksLoc = new File(chunksPath);
-    if (!chunksLoc.exists()) {
-      LOG.error("Chunks path does not exist");
-      throw new StorageContainerException("Unable to get Chunks directory.",
-          UNABLE_TO_FIND_DATA_DIR);
-    }
-    return chunksLoc;
-  }
-
-  /**
    * Checks if we are getting a request to overwrite an existing range of
    * chunk.
    *
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/FilePerBlockStrategy.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/FilePerBlockStrategy.java
index 37bc180..2aa89a4 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/FilePerBlockStrategy.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/FilePerBlockStrategy.java
@@ -103,7 +103,7 @@ public class FilePerBlockStrategy implements ChunkManager {
     KeyValueContainerData containerData = (KeyValueContainerData) container
         .getContainerData();
 
-    File chunkFile = getChunkFile(containerData, blockID);
+    File chunkFile = getChunkFile(container, blockID, info);
     boolean overwrite = validateChunkForOverwrite(chunkFile, info);
     long len = info.getLen();
     long offset = info.getOffset();
@@ -140,7 +140,7 @@ public class FilePerBlockStrategy implements ChunkManager {
     HddsVolume volume = containerData.getVolume();
     VolumeIOStats volumeIOStats = volume.getVolumeIOStats();
 
-    File chunkFile = getChunkFile(containerData, blockID);
+    File chunkFile = getChunkFile(container, blockID, info);
 
     long len = info.getLen();
     long offset = info.getOffset();
@@ -165,7 +165,7 @@ public class FilePerBlockStrategy implements ChunkManager {
   @Override
   public void finishWriteChunk(KeyValueContainer container, BlockID blockID,
       ChunkInfo info) throws IOException {
-    File chunkFile = getChunkFile(container.getContainerData(), blockID);
+    File chunkFile = getChunkFile(container, blockID, info);
     files.close(chunkFile);
   }
 
@@ -175,10 +175,8 @@ public class FilePerBlockStrategy implements ChunkManager {
     checkLayoutVersion(container);
 
     Preconditions.checkNotNull(blockID, "Block ID cannot be null.");
-    KeyValueContainerData containerData = (KeyValueContainerData) container
-        .getContainerData();
 
-    File file = getChunkFile(containerData, blockID);
+    File file = getChunkFile(container, blockID, info);
 
     // if the chunk file does not exist, it might have already been deleted.
     // The call might be because of reapply of transactions on datanode
@@ -198,6 +196,12 @@ public class FilePerBlockStrategy implements ChunkManager {
     LOG.info("Deleted block file: {}", file);
   }
 
+  private File getChunkFile(Container container, BlockID blockID,
+      ChunkInfo info) throws StorageContainerException {
+    return FILE_PER_BLOCK.getChunkFile(container.getContainerData(), blockID,
+        info);
+  }
+
   private static void checkFullDelete(ChunkInfo info, File chunkFile)
       throws StorageContainerException {
     long fileLength = chunkFile.length();
@@ -210,13 +214,6 @@ public class FilePerBlockStrategy implements ChunkManager {
     }
   }
 
-  private static File getChunkFile(
-      KeyValueContainerData containerData, BlockID blockID)
-      throws StorageContainerException {
-    File chunkDir = ChunkUtils.verifyChunkDirExists(containerData);
-    return new File(chunkDir, blockID.getLocalID() + ".block");
-  }
-
   private static final class OpenFiles {
 
     private static final RemovalListener<String, OpenFile> ON_REMOVE =
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/FilePerChunkStrategy.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/FilePerChunkStrategy.java
index 87604dd..834b883 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/FilePerChunkStrategy.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/FilePerChunkStrategy.java
@@ -29,6 +29,7 @@ import org.apache.hadoop.ozone.common.ChunkBuffer;
 import org.apache.hadoop.ozone.container.common.helpers.BlockData;
 import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo;
 import org.apache.hadoop.ozone.container.common.transport.server.ratis.DispatcherContext;
+import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainer;
 import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
 import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
 import org.apache.hadoop.ozone.container.common.volume.VolumeIOStats;
@@ -91,13 +92,12 @@ public class FilePerChunkStrategy implements ChunkManager {
     Preconditions.checkNotNull(dispatcherContext);
     DispatcherContext.WriteChunkStage stage = dispatcherContext.getStage();
     try {
-
-      KeyValueContainerData containerData = (KeyValueContainerData) container
-          .getContainerData();
+      KeyValueContainer kvContainer = (KeyValueContainer) container;
+      KeyValueContainerData containerData = kvContainer.getContainerData();
       HddsVolume volume = containerData.getVolume();
       VolumeIOStats volumeIOStats = volume.getVolumeIOStats();
 
-      File chunkFile = ChunkUtils.getChunkFile(containerData, info);
+      File chunkFile = getChunkFile(kvContainer, blockID, info);
 
       boolean isOverwrite = ChunkUtils.validateChunkForOverwrite(
           chunkFile, info);
@@ -198,15 +198,15 @@ public class FilePerChunkStrategy implements ChunkManager {
 
     checkLayoutVersion(container);
 
-    KeyValueContainerData containerData = (KeyValueContainerData) container
-        .getContainerData();
+    KeyValueContainer kvContainer = (KeyValueContainer) container;
+    KeyValueContainerData containerData = kvContainer.getContainerData();
 
     HddsVolume volume = containerData.getVolume();
     VolumeIOStats volumeIOStats = volume.getVolumeIOStats();
 
     // In version1, we verify checksum if it is available and return data
     // of the chunk file.
-    File finalChunkFile = ChunkUtils.getChunkFile(containerData, info);
+    File finalChunkFile = getChunkFile(kvContainer, blockID, info);
 
     List<File> possibleFiles = new ArrayList<>();
     possibleFiles.add(finalChunkFile);
@@ -252,11 +252,10 @@ public class FilePerChunkStrategy implements ChunkManager {
     checkLayoutVersion(container);
 
     Preconditions.checkNotNull(blockID, "Block ID cannot be null.");
-    KeyValueContainerData containerData = (KeyValueContainerData) container
-        .getContainerData();
+    KeyValueContainer kvContainer = (KeyValueContainer) container;
 
     // In version1, we have only chunk file.
-    File chunkFile = ChunkUtils.getChunkFile(containerData, info);
+    File chunkFile = getChunkFile(kvContainer, blockID, info);
 
     // if the chunk file does not exist, it might have already been deleted.
     // The call might be because of reapply of transactions on datanode
@@ -290,6 +289,12 @@ public class FilePerChunkStrategy implements ChunkManager {
     }
   }
 
+  private static File getChunkFile(KeyValueContainer container, BlockID blockID,
+      ChunkInfo info) throws StorageContainerException {
+    return FILE_PER_CHUNK.getChunkFile(container.getContainerData(), blockID,
+        info);
+  }
+
   /**
    * Returns the temporary chunkFile path.
    * @param chunkFile chunkFileName
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/interfaces/ChunkManager.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/interfaces/ChunkManager.java
index da711ce..fe49e84 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/interfaces/ChunkManager.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/interfaces/ChunkManager.java
@@ -101,5 +101,4 @@ public interface ChunkManager {
       ChunkInfo info) throws IOException {
     // no-op
   }
-
 }
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/ContainerTestUtils.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/ContainerTestUtils.java
index 81e72df..6f159b4 100644
--- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/ContainerTestUtils.java
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/ContainerTestUtils.java
@@ -115,10 +115,11 @@ public final class ContainerTestUtils {
   }
 
   public static KeyValueContainer getContainer(long containerId,
+      ChunkLayOutVersion layout,
       ContainerProtos.ContainerDataProto.State state) {
     KeyValueContainerData kvData =
         new KeyValueContainerData(containerId,
-            ChunkLayOutVersion.FILE_PER_CHUNK,
+            layout,
             (long) StorageUnit.GB.toBytes(5),
             UUID.randomUUID().toString(), UUID.randomUUID().toString());
     kvData.setState(state);
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/TestBlockDeletingService.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/TestBlockDeletingService.java
index c1f4d9f..303ebd7 100644
--- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/TestBlockDeletingService.java
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/TestBlockDeletingService.java
@@ -37,6 +37,7 @@ import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
 import org.apache.hadoop.ozone.container.common.interfaces.Handler;
 import org.apache.hadoop.ozone.container.common.volume.RoundRobinVolumeChoosingPolicy;
 import org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet;
+import org.apache.hadoop.ozone.container.keyvalue.ChunkLayoutTestInfo;
 import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainer;
 import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
 import org.apache.hadoop.ozone.container.keyvalue.KeyValueHandler;
@@ -54,6 +55,8 @@ import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.Test;
 import org.junit.BeforeClass;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 
 import java.io.File;
 import java.io.IOException;
@@ -75,6 +78,7 @@ import static org.mockito.Mockito.when;
 /**
  * Tests to test block deleting service.
  */
+@RunWith(Parameterized.class)
 public class TestBlockDeletingService {
 
   private static File testRoot;
@@ -82,6 +86,17 @@ public class TestBlockDeletingService {
   private static String clusterID;
   private Handler handler;
 
+  private final ChunkLayOutVersion layout;
+
+  public TestBlockDeletingService(ChunkLayOutVersion layout) {
+    this.layout = layout;
+  }
+
+  @Parameterized.Parameters
+  public static Iterable<Object[]> parameters() {
+    return ChunkLayoutTestInfo.chunkLayoutParameters();
+  }
+
   @BeforeClass
   public static void init() throws IOException {
     testRoot = GenericTestUtils
@@ -110,7 +125,7 @@ public class TestBlockDeletingService {
       conf.set(ScmConfigKeys.HDDS_DATANODE_DIR_KEY, testRoot.getAbsolutePath());
       long containerID = ContainerTestHelper.getTestContainerID();
       KeyValueContainerData data = new KeyValueContainerData(containerID,
-          ChunkLayOutVersion.FILE_PER_CHUNK,
+          layout,
           ContainerTestHelper.CONTAINER_MAX_SIZE, UUID.randomUUID().toString(),
           UUID.randomUUID().toString());
       data.closeContainer();
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/TestKeyValueContainerData.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/TestKeyValueContainerData.java
index 2588dd7..de80c6a 100644
--- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/TestKeyValueContainerData.java
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/TestKeyValueContainerData.java
@@ -21,8 +21,11 @@ package org.apache.hadoop.ozone.container.common;
 import org.apache.hadoop.conf.StorageUnit;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
 import org.apache.hadoop.ozone.container.common.impl.ChunkLayOutVersion;
+import org.apache.hadoop.ozone.container.keyvalue.ChunkLayoutTestInfo;
 import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 
 import static org.junit.Assert.assertEquals;
 
@@ -32,9 +35,22 @@ import java.util.concurrent.atomic.AtomicLong;
 /**
  * This class is used to test the KeyValueContainerData.
  */
+@RunWith(Parameterized.class)
 public class TestKeyValueContainerData {
 
   private static final long MAXSIZE = (long) StorageUnit.GB.toBytes(5);
+
+  private final ChunkLayOutVersion layout;
+
+  public TestKeyValueContainerData(ChunkLayOutVersion layout) {
+    this.layout = layout;
+  }
+
+  @Parameterized.Parameters
+  public static Iterable<Object[]> parameters() {
+    return ChunkLayoutTestInfo.chunkLayoutParameters();
+  }
+
   @Test
   public void testKeyValueData() {
     long containerId = 1L;
@@ -49,7 +65,7 @@ public class TestKeyValueContainerData {
     UUID datanodeId = UUID.randomUUID();
 
     KeyValueContainerData kvData = new KeyValueContainerData(containerId,
-        ChunkLayOutVersion.FILE_PER_CHUNK,
+        layout,
         MAXSIZE, pipelineId.toString(), datanodeId.toString());
 
     assertEquals(containerType, kvData.getContainerType());
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerDataYaml.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerDataYaml.java
index 3b008df..d78cf12 100644
--- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerDataYaml.java
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerDataYaml.java
@@ -24,9 +24,12 @@ import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
 import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
+import org.apache.hadoop.ozone.container.keyvalue.ChunkLayoutTestInfo;
 import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 
 import java.io.File;
 import java.io.IOException;
@@ -41,6 +44,7 @@ import static org.junit.Assert.fail;
 /**
  * This class tests create/read .container files.
  */
+@RunWith(Parameterized.class)
 public class TestContainerDataYaml {
 
   private static long testContainerID = 1234;
@@ -53,6 +57,17 @@ public class TestContainerDataYaml {
   private static final String VOLUME_OWNER = "hdfs";
   private static final String CONTAINER_DB_TYPE = "RocksDB";
 
+  private final ChunkLayOutVersion layout;
+
+  public TestContainerDataYaml(ChunkLayOutVersion layout) {
+    this.layout = layout;
+  }
+
+  @Parameterized.Parameters
+  public static Iterable<Object[]> parameters() {
+    return ChunkLayoutTestInfo.chunkLayoutParameters();
+  }
+
   /**
    * Creates a .container file. cleanup() should be called at the end of the
    * test when container file is created.
@@ -63,7 +78,7 @@ public class TestContainerDataYaml {
     String containerPath = containerID + ".container";
 
     KeyValueContainerData keyValueContainerData = new KeyValueContainerData(
-        containerID, FILE_PER_CHUNK, MAXSIZE,
+        containerID, layout, MAXSIZE,
         UUID.randomUUID().toString(),
         UUID.randomUUID().toString());
     keyValueContainerData.setContainerDBType(CONTAINER_DB_TYPE);
@@ -104,7 +119,7 @@ public class TestContainerDataYaml {
     assertEquals(containerFile.getParent(), kvData.getChunksPath());
     assertEquals(ContainerProtos.ContainerDataProto.State.OPEN, kvData
         .getState());
-    assertEquals(FILE_PER_CHUNK, kvData.getLayOutVersion());
+    assertEquals(layout, kvData.getLayOutVersion());
     assertEquals(0, kvData.getMetadata().size());
     assertEquals(MAXSIZE, kvData.getMaxSize());
     assertEquals(MAXSIZE, kvData.getMaxSize());
@@ -135,7 +150,7 @@ public class TestContainerDataYaml {
     assertEquals(containerFile.getParent(), kvData.getChunksPath());
     assertEquals(ContainerProtos.ContainerDataProto.State.CLOSED, kvData
         .getState());
-    assertEquals(FILE_PER_CHUNK, kvData.getLayOutVersion());
+    assertEquals(layout, kvData.getLayOutVersion());
     assertEquals(2, kvData.getMetadata().size());
     assertEquals(VOLUME_OWNER, kvData.getMetadata().get(OzoneConsts.VOLUME));
     assertEquals(OzoneConsts.OZONE,
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerDeletionChoosingPolicy.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerDeletionChoosingPolicy.java
index 1dd4711..4cd0e51 100644
--- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerDeletionChoosingPolicy.java
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerDeletionChoosingPolicy.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -34,6 +34,7 @@ import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.scm.ScmConfigKeys;
 import org.apache.hadoop.ozone.container.ContainerTestHelper;
 import org.apache.hadoop.ozone.container.common.interfaces.ContainerDeletionChoosingPolicy;
+import org.apache.hadoop.ozone.container.keyvalue.ChunkLayoutTestInfo;
 import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainer;
 import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
 import org.apache.hadoop.ozone.container.keyvalue.statemachine.background.BlockDeletingService;
@@ -42,11 +43,14 @@ import org.apache.hadoop.test.GenericTestUtils;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 import org.mockito.Mockito;
 
 /**
  * The class for testing container deletion choosing policy.
  */
+@RunWith(Parameterized.class)
 public class TestContainerDeletionChoosingPolicy {
   private static String path;
   private OzoneContainer ozoneContainer;
@@ -57,6 +61,17 @@ public class TestContainerDeletionChoosingPolicy {
   private static final int SERVICE_TIMEOUT_IN_MILLISECONDS = 0;
   private static final int SERVICE_INTERVAL_IN_MILLISECONDS = 1000;
 
+  private final ChunkLayOutVersion layout;
+
+  public TestContainerDeletionChoosingPolicy(ChunkLayOutVersion layout) {
+    this.layout = layout;
+  }
+
+  @Parameterized.Parameters
+  public static Iterable<Object[]> parameters() {
+    return ChunkLayoutTestInfo.chunkLayoutParameters();
+  }
+
   @Before
   public void init() throws Throwable {
     conf = new OzoneConfiguration();
@@ -82,7 +97,7 @@ public class TestContainerDeletionChoosingPolicy {
     int numContainers = 10;
     for (int i = 0; i < numContainers; i++) {
       KeyValueContainerData data = new KeyValueContainerData(i,
-          ChunkLayOutVersion.FILE_PER_CHUNK,
+          layout,
           ContainerTestHelper.CONTAINER_MAX_SIZE, UUID.randomUUID().toString(),
           UUID.randomUUID().toString());
       data.closeContainer();
@@ -140,7 +155,7 @@ public class TestContainerDeletionChoosingPolicy {
       long containerId = RandomUtils.nextLong();
       KeyValueContainerData data =
           new KeyValueContainerData(containerId,
-              ChunkLayOutVersion.FILE_PER_CHUNK,
+              layout,
               ContainerTestHelper.CONTAINER_MAX_SIZE,
               UUID.randomUUID().toString(),
               UUID.randomUUID().toString());
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerPersistence.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerPersistence.java
index 1b0f70f..303252b 100644
--- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerPersistence.java
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerPersistence.java
@@ -18,7 +18,6 @@
 package org.apache.hadoop.ozone.container.common.impl;
 
 import com.google.common.collect.Maps;
-import org.apache.commons.codec.binary.Hex;
 import org.apache.commons.io.FileUtils;
 import org.apache.hadoop.hdds.client.BlockID;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
@@ -41,6 +40,7 @@ import org.apache.hadoop.ozone.container.common.transport.server.ratis.Dispatche
 import org.apache.hadoop.ozone.container.common.volume.RoundRobinVolumeChoosingPolicy;
 import org.apache.hadoop.ozone.container.common.volume.VolumeSet;
 import org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet;
+import org.apache.hadoop.ozone.container.keyvalue.ChunkLayoutTestInfo;
 import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainer;
 import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
 import org.apache.hadoop.ozone.container.keyvalue.helpers.BlockUtils;
@@ -59,16 +59,16 @@ import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.ExpectedException;
 import org.junit.rules.Timeout;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.IOException;
-import java.nio.ByteBuffer;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.Paths;
-import java.security.MessageDigest;
 import java.security.NoSuchAlgorithmException;
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -82,7 +82,6 @@ import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Res
 import static org.apache.hadoop.ozone.container.ContainerTestHelper.getChunk;
 import static org.apache.hadoop.ozone.container.ContainerTestHelper.getData;
 import static org.apache.hadoop.ozone.container.ContainerTestHelper.setDataChecksum;
-import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
 
 /**
@@ -90,6 +89,7 @@ import static org.junit.Assert.fail;
  * these tests are specific to {@link KeyValueContainer}. If a new {@link
  * ContainerProtos.ContainerType} is added, the tests need to be modified.
  */
+@RunWith(Parameterized.class)
 public class TestContainerPersistence {
   private static final String DATANODE_UUID = UUID.randomUUID().toString();
   private static final String SCM_ID = UUID.randomUUID().toString();
@@ -110,6 +110,17 @@ public class TestContainerPersistence {
   @Rule
   public Timeout testTimeout = new Timeout(300000);
 
+  private final ChunkLayOutVersion layout;
+
+  public TestContainerPersistence(ChunkLayOutVersion layout) {
+    this.layout = layout;
+  }
+
+  @Parameterized.Parameters
+  public static Iterable<Object[]> parameters() {
+    return ChunkLayoutTestInfo.chunkLayoutParameters();
+  }
+
   @BeforeClass
   public static void init() {
     conf = new OzoneConfiguration();
@@ -164,7 +175,7 @@ public class TestContainerPersistence {
     long commitBytesAfter = 0;
     long commitIncrement = 0;
     KeyValueContainerData data = new KeyValueContainerData(cID,
-        ChunkLayOutVersion.FILE_PER_BLOCK,
+        layout,
         ContainerTestHelper.CONTAINER_MAX_SIZE, UUID.randomUUID().toString(),
         UUID.randomUUID().toString());
     data.addMetadata("VOLUME", "shire");
@@ -422,39 +433,6 @@ public class TestContainerPersistence {
   }
 
   /**
-   * Test partial within a single chunk.
-   *
-   * @throws IOException
-   */
-  @Test
-  public void testPartialRead() throws Exception {
-    final int datalen = 1024;
-    final int start = datalen / 4;
-    final int length = datalen / 2;
-
-    long testContainerID = getTestContainerID();
-    Container container = addContainer(containerSet, testContainerID);
-
-    BlockID blockID = ContainerTestHelper.getTestBlockID(testContainerID);
-    ChunkInfo info = getChunk(
-        blockID.getLocalID(), 0, 0, datalen);
-    ChunkBuffer data = getData(datalen);
-    setDataChecksum(info, data);
-    chunkManager.writeChunk(container, blockID, info, data,
-        getDispatcherContext());
-
-    ChunkBuffer readData = chunkManager
-        .readChunk(container, blockID, info, getDispatcherContext());
-    assertEquals(data.rewind(), readData.rewind());
-
-    ChunkInfo info2 = getChunk(blockID.getLocalID(), 0, start, length);
-    ChunkBuffer readData2 = chunkManager
-        .readChunk(container, blockID, info2, getDispatcherContext());
-    assertEquals(length, info2.getLen());
-    assertEquals(data.duplicate(start, start + length), readData2.rewind());
-  }
-
-  /**
    * Writes a single chunk and tries to overwrite that chunk without over write
    * flag then re-tries with overwrite flag.
    *
@@ -492,50 +470,6 @@ public class TestContainerPersistence {
   }
 
   /**
-   * This test writes data as many small writes and tries to read back the data
-   * in a single large read.
-   *
-   * @throws IOException
-   * @throws NoSuchAlgorithmException
-   */
-  @Test
-  public void testMultipleWriteSingleRead() throws IOException,
-      NoSuchAlgorithmException {
-    final int datalen = 1024;
-    final int chunkCount = 1024;
-
-    long testContainerID = getTestContainerID();
-    Container container = addContainer(containerSet, testContainerID);
-
-    BlockID blockID = ContainerTestHelper.getTestBlockID(testContainerID);
-    MessageDigest oldSha = MessageDigest.getInstance(OzoneConsts.FILE_HASH);
-    for (int x = 0; x < chunkCount; x++) {
-      // we are writing to the same chunk file but at different offsets.
-      long offset = x * datalen;
-      ChunkInfo info = getChunk(
-          blockID.getLocalID(), 0, offset, datalen);
-      ChunkBuffer data = getData(datalen);
-      oldSha.update(data.toByteString().asReadOnlyByteBuffer());
-      data.rewind();
-      setDataChecksum(info, data);
-      chunkManager.writeChunk(container, blockID, info, data,
-          getDispatcherContext());
-    }
-
-    // Request to read the whole data in a single go.
-    ChunkInfo largeChunk = getChunk(blockID.getLocalID(), 0, 0,
-        datalen * chunkCount);
-    ChunkBuffer chunk =
-        chunkManager.readChunk(container, blockID, largeChunk,
-            getDispatcherContext());
-    ByteBuffer newdata = chunk.toByteString().asReadOnlyByteBuffer();
-    MessageDigest newSha = MessageDigest.getInstance(OzoneConsts.FILE_HASH);
-    newSha.update(newdata);
-    Assert.assertEquals(Hex.encodeHexString(oldSha.digest()),
-        Hex.encodeHexString(newSha.digest()));
-  }
-
-  /**
    * Writes a chunk and deletes it, re-reads to make sure it is gone.
    *
    * @throws IOException
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerSet.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerSet.java
index b827d04..d0dde34 100644
--- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerSet.java
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerSet.java
@@ -26,10 +26,13 @@ import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerExcep
 import org.apache.hadoop.ozone.container.common.interfaces.Container;
 
 import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
+import org.apache.hadoop.ozone.container.keyvalue.ChunkLayoutTestInfo;
 import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainer;
 import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 import org.mockito.Mockito;
 
 import java.io.IOException;
@@ -53,10 +56,22 @@ import static org.junit.Assert.fail;
 /**
  * Class used to test ContainerSet operations.
  */
+@RunWith(Parameterized.class)
 public class TestContainerSet {
 
   private static final int FIRST_ID = 2;
 
+  private final ChunkLayOutVersion layout;
+
+  public TestContainerSet(ChunkLayOutVersion layout) {
+    this.layout = layout;
+  }
+
+  @Parameterized.Parameters
+  public static Iterable<Object[]> parameters() {
+    return ChunkLayoutTestInfo.chunkLayoutParameters();
+  }
+
   @Test
   public void testAddGetRemoveContainer() throws StorageContainerException {
     ContainerSet containerSet = new ContainerSet();
@@ -65,7 +80,7 @@ public class TestContainerSet {
         .ContainerDataProto.State.CLOSED;
 
     KeyValueContainerData kvData = new KeyValueContainerData(containerId,
-        ChunkLayOutVersion.FILE_PER_CHUNK,
+        layout,
         (long) StorageUnit.GB.toBytes(5), UUID.randomUUID().toString(),
         UUID.randomUUID().toString());
     kvData.setState(state);
@@ -154,7 +169,7 @@ public class TestContainerSet {
     ContainerSet containerSet = new ContainerSet();
     for (int i=0; i<10; i++) {
       KeyValueContainerData kvData = new KeyValueContainerData(i,
-          ChunkLayOutVersion.FILE_PER_CHUNK,
+          layout,
           (long) StorageUnit.GB.toBytes(5), UUID.randomUUID().toString(),
           UUID.randomUUID().toString());
       if (i%2 == 0) {
@@ -196,7 +211,7 @@ public class TestContainerSet {
     int containerCount = 50;
     for (int i = 0; i < containerCount; i++) {
       KeyValueContainerData kvData = new KeyValueContainerData(i,
-          ChunkLayOutVersion.FILE_PER_CHUNK,
+          layout,
           (long) StorageUnit.GB.toBytes(5), UUID.randomUUID().toString(),
           UUID.randomUUID().toString());
       if (random.nextBoolean()) {
@@ -289,7 +304,7 @@ public class TestContainerSet {
     ContainerSet containerSet = new ContainerSet();
     for (int i = FIRST_ID; i < FIRST_ID + 10; i++) {
       KeyValueContainerData kvData = new KeyValueContainerData(i,
-          ChunkLayOutVersion.FILE_PER_CHUNK,
+          layout,
           (long) StorageUnit.GB.toBytes(5), UUID.randomUUID().toString(),
           UUID.randomUUID().toString());
       if (i%2 == 0) {
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestHddsDispatcher.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestHddsDispatcher.java
index 5a099be..44ebebc 100644
--- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestHddsDispatcher.java
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestHddsDispatcher.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -47,6 +47,7 @@ import org.apache.hadoop.ozone.container.common.transport.server.ratis.Dispatche
 import org.apache.hadoop.ozone.container.common.volume.RoundRobinVolumeChoosingPolicy;
 import org.apache.hadoop.ozone.container.common.volume.VolumeSet;
 import org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet;
+import org.apache.hadoop.ozone.container.keyvalue.ChunkLayoutTestInfo;
 import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainer;
 import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
 import org.apache.hadoop.test.GenericTestUtils;
@@ -54,6 +55,8 @@ import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
 import org.junit.Assert;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 import org.mockito.Mockito;
 
 import java.io.File;
@@ -73,11 +76,23 @@ import static org.mockito.Mockito.verify;
 /**
  * Test-cases to verify the functionality of HddsDispatcher.
  */
+@RunWith(Parameterized.class)
 public class TestHddsDispatcher {
 
   public static final Consumer<ContainerReplicaProto> NO_OP_ICR_SENDER =
       c -> {};
 
+  private final ChunkLayOutVersion layout;
+
+  public TestHddsDispatcher(ChunkLayOutVersion layout) {
+    this.layout = layout;
+  }
+
+  @Parameterized.Parameters
+  public static Iterable<Object[]> parameters() {
+    return ChunkLayoutTestInfo.chunkLayoutParameters();
+  }
+
   @Test
   public void testContainerCloseActionWhenFull() throws IOException {
     String testDir = GenericTestUtils.getTempPath(
@@ -97,7 +112,7 @@ public class TestHddsDispatcher {
       Mockito.when(stateMachine.getDatanodeDetails()).thenReturn(dd);
       Mockito.when(context.getParent()).thenReturn(stateMachine);
       KeyValueContainerData containerData = new KeyValueContainerData(1L,
-          ChunkLayOutVersion.FILE_PER_CHUNK,
+          layout,
           (long) StorageUnit.GB.toBytes(1), UUID.randomUUID().toString(),
           dd.getUuidString());
       Container container = new KeyValueContainer(containerData, conf);
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerCommandHandler.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerCommandHandler.java
index 1318a97..5182c1c 100644
--- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerCommandHandler.java
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerCommandHandler.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with this
  * work for additional information regarding copyright ownership.  The ASF
@@ -28,6 +28,7 @@ import org.apache.hadoop.ozone.container.common.statemachine
     .DatanodeStateMachine;
 import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
 import org.apache.hadoop.ozone.container.common.transport.server.XceiverServerSpi;
+import org.apache.hadoop.ozone.container.keyvalue.ChunkLayoutTestInfo;
 import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainer;
 import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
 import org.apache.hadoop.ozone.container.ozoneimpl.ContainerController;
@@ -35,6 +36,8 @@ import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
 import org.apache.hadoop.ozone.protocol.commands.CloseContainerCommand;
 import org.junit.Before;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 
 import java.io.IOException;
 import java.util.UUID;
@@ -51,6 +54,7 @@ import static org.mockito.Mockito.when;
 /**
  * Test cases to verify CloseContainerCommandHandler in datanode.
  */
+@RunWith(Parameterized.class)
 public class TestCloseContainerCommandHandler {
 
   private static final long CONTAINER_ID = 123L;
@@ -66,6 +70,17 @@ public class TestCloseContainerCommandHandler {
   private CloseContainerCommandHandler subject =
       new CloseContainerCommandHandler();
 
+  private final ChunkLayOutVersion layout;
+
+  public TestCloseContainerCommandHandler(ChunkLayOutVersion layout) {
+    this.layout = layout;
+  }
+
+  @Parameterized.Parameters
+  public static Iterable<Object[]> parameters() {
+    return ChunkLayoutTestInfo.chunkLayoutParameters();
+  }
+
   @Before
   public void before() throws Exception {
     context = mock(StateContext.class);
@@ -77,7 +92,7 @@ public class TestCloseContainerCommandHandler {
     pipelineID = PipelineID.randomId();
 
     KeyValueContainerData data = new KeyValueContainerData(CONTAINER_ID,
-        ChunkLayOutVersion.FILE_PER_CHUNK, GB,
+        layout, GB,
         pipelineID.getId().toString(), null);
 
     container = new KeyValueContainer(data, new OzoneConfiguration());
@@ -226,4 +241,4 @@ public class TestCloseContainerCommandHandler {
         .addPort(restPort);
     return builder.build();
   }
-}
\ No newline at end of file
+}
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/volume/TestHddsVolumeChecker.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/volume/TestHddsVolumeChecker.java
index fe3f3aa..e4cd60f 100644
--- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/volume/TestHddsVolumeChecker.java
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/volume/TestHddsVolumeChecker.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.hdds.scm.ScmConfigKeys;
 import org.apache.hadoop.hdfs.server.datanode.checker.Checkable;
 import org.apache.hadoop.hdfs.server.datanode.checker.VolumeCheckResult;
 import org.apache.hadoop.ozone.container.common.ContainerTestUtils;
+import org.apache.hadoop.ozone.container.common.impl.ChunkLayOutVersion;
 import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
 import org.apache.hadoop.ozone.container.common.interfaces.Container;
 import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
@@ -78,6 +79,8 @@ public class TestHddsVolumeChecker {
   public static final Logger LOG = LoggerFactory.getLogger(
       TestHddsVolumeChecker.class);
 
+  private static final int NUM_VOLUMES = 2;
+
   @Rule
   public TemporaryFolder folder = new TemporaryFolder();
 
@@ -89,6 +92,19 @@ public class TestHddsVolumeChecker {
 
   private OzoneConfiguration conf = new OzoneConfiguration();
 
+  /**
+   * When null, the check call should throw an exception.
+   */
+  private final VolumeCheckResult expectedVolumeHealth;
+
+  private final ChunkLayOutVersion layout;
+
+  public TestHddsVolumeChecker(VolumeCheckResult result,
+      ChunkLayOutVersion layout) {
+    this.expectedVolumeHealth = result;
+    this.layout = layout;
+  }
+
   @Before
   public void setup() throws IOException {
     conf = new OzoneConfiguration();
@@ -106,28 +122,19 @@ public class TestHddsVolumeChecker {
   /**
    * Run each test case for each possible value of {@link VolumeCheckResult}.
    * Including "null" for 'throw exception'.
-   * @return
    */
-  @Parameters(name="{0}")
+  @Parameters
   public static Collection<Object[]> data() {
     List<Object[]> values = new ArrayList<>();
-    for (VolumeCheckResult result : VolumeCheckResult.values()) {
-      values.add(new Object[] {result});
+    for (ChunkLayOutVersion layout : ChunkLayOutVersion.values()) {
+      for (VolumeCheckResult result : VolumeCheckResult.values()) {
+        values.add(new Object[]{result, layout});
+      }
+      values.add(new Object[]{null, layout});
     }
-    values.add(new Object[] {null});
     return values;
   }
 
-  /**
-   * When null, the check call should throw an exception.
-   */
-  private final VolumeCheckResult expectedVolumeHealth;
-  private static final int NUM_VOLUMES = 2;
-
-
-  public TestHddsVolumeChecker(VolumeCheckResult expectedVolumeHealth) {
-    this.expectedVolumeHealth = expectedVolumeHealth;
-  }
 
   /**
    * Test {@link HddsVolumeChecker#checkVolume} propagates the
@@ -233,7 +240,8 @@ public class TestHddsVolumeChecker {
     for (ContainerDataProto.State state : ContainerDataProto.State.values()) {
       if (!state.equals(ContainerDataProto.State.INVALID)) {
         // add containers to the created volume
-        Container container = ContainerTestUtils.getContainer(++i, state);
+        Container container = ContainerTestUtils.getContainer(++i, layout,
+            state);
         container.getContainerData()
             .setVolume(volumeSet.getVolumeMap().get(volRootDir.getPath()));
         ((KeyValueContainerData) container.getContainerData())
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/ChunkLayoutTestInfo.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/ChunkLayoutTestInfo.java
new file mode 100644
index 0000000..b8bafb2
--- /dev/null
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/ChunkLayoutTestInfo.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.container.keyvalue;
+
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.ozone.container.common.impl.ChunkLayOutVersion;
+import org.apache.hadoop.ozone.container.keyvalue.impl.ChunkManagerDummyImpl;
+import org.apache.hadoop.ozone.container.keyvalue.impl.FilePerBlockStrategy;
+import org.apache.hadoop.ozone.container.keyvalue.impl.FilePerChunkStrategy;
+import org.apache.hadoop.ozone.container.keyvalue.interfaces.ChunkManager;
+
+import java.io.File;
+
+import static java.util.stream.Collectors.toList;
+import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_CONTAINER_PERSISTDATA;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_CHUNK_LAYOUT_KEY;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Interface of parameters for testing different chunk layout implementations.
+ */
+public enum ChunkLayoutTestInfo {
+
+  DUMMY {
+    @Override
+    public ChunkManager createChunkManager(boolean sync) {
+      return new ChunkManagerDummyImpl();
+    }
+
+    @Override
+    public void validateFileCount(File dir, long blockCount, long chunkCount) {
+      assertFileCount(dir, 0);
+    }
+
+    @Override
+    public ChunkLayOutVersion getLayout() {
+      return null;
+    }
+
+    @Override
+    public void updateConfig(OzoneConfiguration config) {
+      config.setBoolean(HDDS_CONTAINER_PERSISTDATA, false);
+    }
+  },
+
+  FILE_PER_CHUNK {
+    public ChunkManager createChunkManager(boolean sync) {
+      return new FilePerChunkStrategy(sync);
+    }
+
+    @Override
+    public void validateFileCount(File dir, long blockCount, long chunkCount) {
+      assertFileCount(dir, chunkCount);
+    }
+
+    @Override
+    public ChunkLayOutVersion getLayout() {
+      return ChunkLayOutVersion.FILE_PER_CHUNK;
+    }
+  },
+
+  FILE_PER_BLOCK {
+    public ChunkManager createChunkManager(boolean sync) {
+      return new FilePerBlockStrategy(sync);
+    }
+
+    @Override
+    public void validateFileCount(File dir, long blockCount, long chunkCount) {
+      assertFileCount(dir, blockCount);
+    }
+
+    @Override
+    public ChunkLayOutVersion getLayout() {
+      return ChunkLayOutVersion.FILE_PER_BLOCK;
+    }
+  };
+
+  public abstract ChunkManager createChunkManager(boolean sync);
+
+  public abstract void validateFileCount(File dir, long blockCount,
+      long chunkCount);
+
+  public abstract ChunkLayOutVersion getLayout();
+
+  public void updateConfig(OzoneConfiguration config) {
+    config.set(OZONE_SCM_CHUNK_LAYOUT_KEY, getLayout().name());
+  }
+
+  private static void assertFileCount(File dir, long count) {
+    assertNotNull(dir);
+    assertTrue(dir.exists());
+
+    File[] files = dir.listFiles();
+    assertNotNull(files);
+    assertEquals(count, files.length);
+  }
+
+  public static Iterable<Object[]> chunkLayoutParameters() {
+    return ChunkLayOutVersion.getAllVersions().stream()
+        .map(each -> new Object[] {each})
+        .collect(toList());
+  }
+}
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueBlockIterator.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueBlockIterator.java
index 40a4219..ab4d5f6 100644
--- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueBlockIterator.java
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueBlockIterator.java
@@ -56,6 +56,8 @@ import static org.apache.hadoop.ozone.OzoneConfigKeys
     .OZONE_METADATA_STORE_IMPL_LEVELDB;
 import static org.apache.hadoop.ozone.OzoneConfigKeys
     .OZONE_METADATA_STORE_IMPL_ROCKSDB;
+import static org.apache.hadoop.ozone.container.common.impl.ChunkLayOutVersion.FILE_PER_BLOCK;
+import static org.apache.hadoop.ozone.container.common.impl.ChunkLayOutVersion.FILE_PER_CHUNK;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
@@ -73,16 +75,21 @@ public class TestKeyValueBlockIterator {
   private File testRoot;
 
   private final String storeImpl;
+  private final ChunkLayOutVersion layout;
 
-  public TestKeyValueBlockIterator(String metadataImpl) {
+  public TestKeyValueBlockIterator(String metadataImpl,
+      ChunkLayOutVersion layout) {
     this.storeImpl = metadataImpl;
+    this.layout = layout;
   }
 
   @Parameterized.Parameters
   public static Collection<Object[]> data() {
     return Arrays.asList(new Object[][] {
-        {OZONE_METADATA_STORE_IMPL_LEVELDB},
-        {OZONE_METADATA_STORE_IMPL_ROCKSDB}});
+        {OZONE_METADATA_STORE_IMPL_LEVELDB, FILE_PER_CHUNK},
+        {OZONE_METADATA_STORE_IMPL_ROCKSDB, FILE_PER_CHUNK},
+        {OZONE_METADATA_STORE_IMPL_LEVELDB, FILE_PER_BLOCK},
+        {OZONE_METADATA_STORE_IMPL_ROCKSDB, FILE_PER_BLOCK}});
   }
 
   @Before
@@ -250,7 +257,7 @@ public class TestKeyValueBlockIterator {
       normalBlocks, int deletedBlocks) throws
       Exception {
     containerData = new KeyValueContainerData(containerId,
-        ChunkLayOutVersion.FILE_PER_CHUNK,
+        layout,
         (long) StorageUnit.GB.toBytes(1), UUID.randomUUID().toString(),
         UUID.randomUUID().toString());
     container = new KeyValueContainer(containerData, conf);
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueContainer.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueContainer.java
index b589476..affa83a 100644
--- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueContainer.java
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueContainer.java
@@ -48,6 +48,8 @@ import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 import org.mockito.Mockito;
 import org.rocksdb.Options;
 
@@ -75,6 +77,7 @@ import static org.mockito.Mockito.mock;
 /**
  * Class to test KeyValue Container operations.
  */
+@RunWith(Parameterized.class)
 public class TestKeyValueContainer {
 
   @Rule
@@ -88,6 +91,17 @@ public class TestKeyValueContainer {
   private KeyValueContainer keyValueContainer;
   private UUID datanodeId;
 
+  private final ChunkLayOutVersion layout;
+
+  public TestKeyValueContainer(ChunkLayOutVersion layout) {
+    this.layout = layout;
+  }
+
+  @Parameterized.Parameters
+  public static Iterable<Object[]> parameters() {
+    return ChunkLayoutTestInfo.chunkLayoutParameters();
+  }
+
   @Before
   public void setUp() throws Exception {
     conf = new OzoneConfiguration();
@@ -102,7 +116,7 @@ public class TestKeyValueContainer {
         .thenReturn(hddsVolume);
 
     keyValueContainerData = new KeyValueContainerData(1L,
-        ChunkLayOutVersion.FILE_PER_CHUNK,
+        layout,
         (long) StorageUnit.GB.toBytes(5), UUID.randomUUID().toString(),
         datanodeId.toString());
 
@@ -112,7 +126,7 @@ public class TestKeyValueContainer {
   @Test
   public void testBlockIterator() throws Exception{
     keyValueContainerData = new KeyValueContainerData(100L,
-        ChunkLayOutVersion.FILE_PER_CHUNK,
+        layout,
         (long) StorageUnit.GB.toBytes(1), UUID.randomUUID().toString(),
         datanodeId.toString());
     keyValueContainer = new KeyValueContainer(
@@ -158,7 +172,6 @@ public class TestKeyValueContainer {
     }
   }
 
-  @SuppressWarnings("RedundantCast")
   @Test
   public void testCreateContainer() throws Exception {
 
@@ -226,7 +239,7 @@ public class TestKeyValueContainer {
     //create a new one
     KeyValueContainerData containerData =
         new KeyValueContainerData(containerId,
-            ChunkLayOutVersion.FILE_PER_CHUNK,
+            keyValueContainerData.getLayOutVersion(),
             keyValueContainerData.getMaxSize(), UUID.randomUUID().toString(),
             datanodeId.toString());
     KeyValueContainer container = new KeyValueContainer(containerData, conf);
@@ -407,7 +420,7 @@ public class TestKeyValueContainer {
 
     // Create Container 2
     keyValueContainerData = new KeyValueContainerData(2L,
-        ChunkLayOutVersion.FILE_PER_CHUNK,
+        layout,
         (long) StorageUnit.GB.toBytes(5), UUID.randomUUID().toString(),
         datanodeId.toString());
 
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueContainerCheck.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueContainerCheck.java
index 549b149..0bd5b07 100644
--- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueContainerCheck.java
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueContainerCheck.java
@@ -20,7 +20,6 @@ package org.apache.hadoop.ozone.container.keyvalue;
 
 import com.google.common.primitives.Longs;
 import org.apache.commons.lang3.RandomStringUtils;
-import org.apache.hadoop.conf.StorageUnit;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.hdds.client.BlockID;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
@@ -32,14 +31,11 @@ import org.apache.hadoop.ozone.common.Checksum;
 import org.apache.hadoop.ozone.common.ChecksumData;
 import org.apache.hadoop.ozone.container.common.helpers.BlockData;
 import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo;
-import org.apache.hadoop.ozone.container.common.impl.ChunkLayOutVersion;
 import org.apache.hadoop.ozone.container.common.transport.server.ratis.DispatcherContext;
 import org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet;
-import org.apache.hadoop.ozone.container.keyvalue.helpers.ChunkUtils;
 import org.apache.hadoop.ozone.container.keyvalue.helpers.KeyValueContainerLocationUtil;
 import org.apache.hadoop.ozone.container.common.volume.RoundRobinVolumeChoosingPolicy;
 import org.apache.hadoop.ozone.container.keyvalue.helpers.BlockUtils;
-import org.apache.hadoop.ozone.container.keyvalue.impl.FilePerChunkStrategy;
 import org.apache.hadoop.ozone.container.keyvalue.interfaces.ChunkManager;
 import org.apache.hadoop.ozone.container.ozoneimpl.ContainerScrubberConfiguration;
 import org.apache.hadoop.test.GenericTestUtils;
@@ -49,6 +45,8 @@ import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.RandomAccessFile;
@@ -74,28 +72,44 @@ import static org.junit.Assert.assertFalse;
  * Basic sanity test for the KeyValueContainerCheck class.
  */
 @RunWith(Parameterized.class) public class TestKeyValueContainerCheck {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(TestKeyValueContainerCheck.class);
+
   private final String storeImpl;
+  private final ChunkLayoutTestInfo chunkManagerTestInfo;
   private KeyValueContainer container;
   private KeyValueContainerData containerData;
   private MutableVolumeSet volumeSet;
   private OzoneConfiguration conf;
   private File testRoot;
+  private ChunkManager chunkManager;
 
-  public TestKeyValueContainerCheck(String metadataImpl) {
+  public TestKeyValueContainerCheck(String metadataImpl,
+      ChunkLayoutTestInfo chunkManagerTestInfo) {
     this.storeImpl = metadataImpl;
+    this.chunkManagerTestInfo = chunkManagerTestInfo;
   }
 
   @Parameterized.Parameters public static Collection<Object[]> data() {
-    return Arrays.asList(new Object[][] {{OZONE_METADATA_STORE_IMPL_LEVELDB},
-        {OZONE_METADATA_STORE_IMPL_ROCKSDB}});
+    return Arrays.asList(new Object[][] {
+        {OZONE_METADATA_STORE_IMPL_LEVELDB, ChunkLayoutTestInfo.FILE_PER_CHUNK},
+        {OZONE_METADATA_STORE_IMPL_LEVELDB, ChunkLayoutTestInfo.FILE_PER_BLOCK},
+        {OZONE_METADATA_STORE_IMPL_ROCKSDB, ChunkLayoutTestInfo.FILE_PER_CHUNK},
+        {OZONE_METADATA_STORE_IMPL_ROCKSDB, ChunkLayoutTestInfo.FILE_PER_BLOCK}
+    });
   }
 
   @Before public void setUp() throws Exception {
+    LOG.info("Testing store:{} layout:{}",
+        storeImpl, chunkManagerTestInfo.getLayout());
     this.testRoot = GenericTestUtils.getRandomizedTestDir();
     conf = new OzoneConfiguration();
     conf.set(HDDS_DATANODE_DIR_KEY, testRoot.getAbsolutePath());
     conf.set(OZONE_METADATA_STORE_IMPL, storeImpl);
+    chunkManagerTestInfo.updateConfig(conf);
     volumeSet = new MutableVolumeSet(UUID.randomUUID().toString(), conf);
+    chunkManager = chunkManagerTestInfo.createChunkManager(true);
   }
 
   @After public void teardown() {
@@ -168,8 +182,10 @@ import static org.junit.Assert.assertFalse;
       BlockData block = kvIter.nextBlock();
       assertFalse(block.getChunks().isEmpty());
       ContainerProtos.ChunkInfo c = block.getChunks().get(0);
-      File chunkFile = ChunkUtils.getChunkFile(containerData,
-          ChunkInfo.getFromProtoBuf(c));
+      BlockID blockID = block.getBlockID();
+      ChunkInfo chunkInfo = ChunkInfo.getFromProtoBuf(c);
+      File chunkFile = chunkManagerTestInfo.getLayout()
+          .getChunkFile(containerData, blockID, chunkInfo);
       long length = chunkFile.length();
       assertTrue(length > 0);
       // forcefully truncate the file to induce failure.
@@ -206,26 +222,25 @@ import static org.junit.Assert.assertFalse;
         bytesPerChecksum);
     byte[] chunkData = RandomStringUtils.randomAscii(chunkLen).getBytes();
     ChecksumData checksumData = checksum.computeChecksum(chunkData);
+    DispatcherContext writeStage = new DispatcherContext.Builder()
+        .setStage(DispatcherContext.WriteChunkStage.WRITE_DATA)
+        .build();
+    DispatcherContext commitStage = new DispatcherContext.Builder()
+        .setStage(DispatcherContext.WriteChunkStage.COMMIT_DATA)
+        .build();
 
     containerData = new KeyValueContainerData(containerId,
-        ChunkLayOutVersion.FILE_PER_CHUNK,
-        (long) StorageUnit.BYTES.toBytes(
-            chunksPerBlock * chunkLen * totalBlocks),
+        chunkManagerTestInfo.getLayout(),
+        chunksPerBlock * chunkLen * totalBlocks,
         UUID.randomUUID().toString(), UUID.randomUUID().toString());
     container = new KeyValueContainer(containerData, conf);
     container.create(volumeSet, new RoundRobinVolumeChoosingPolicy(),
         UUID.randomUUID().toString());
     try (ReferenceCountedDB metadataStore = BlockUtils.getDB(containerData,
         conf)) {
-      ChunkManager chunkManager = new FilePerChunkStrategy(true);
-
       assertNotNull(containerData.getChunksPath());
       File chunksPath = new File(containerData.getChunksPath());
-      assertTrue(chunksPath.exists());
-      // Initially chunks folder should be empty.
-      File[] chunkFilesBefore = chunksPath.listFiles();
-      assertNotNull(chunkFilesBefore);
-      assertEquals(0, chunkFilesBefore.length);
+      chunkManagerTestInfo.validateFileCount(chunksPath, 0, 0);
 
       List<ContainerProtos.ChunkInfo> chunkList = new ArrayList<>();
       for (int i = 0; i < totalBlocks; i++) {
@@ -235,19 +250,14 @@ import static org.junit.Assert.assertFalse;
         chunkList.clear();
         for (long chunkCount = 0; chunkCount < chunksPerBlock; chunkCount++) {
           String chunkName = strBlock + i + strChunk + chunkCount;
-          ChunkInfo info = new ChunkInfo(chunkName, 0, chunkLen);
+          long offset = chunkCount * chunkLen;
+          ChunkInfo info = new ChunkInfo(chunkName, offset, chunkLen);
           info.setChecksumData(checksumData);
           chunkList.add(info.getProtoBufMessage());
-          chunkManager
-              .writeChunk(container, blockID, info, ByteBuffer.wrap(chunkData),
-                  new DispatcherContext.Builder()
-                      .setStage(DispatcherContext.WriteChunkStage.WRITE_DATA)
-                      .build());
-          chunkManager
-              .writeChunk(container, blockID, info, ByteBuffer.wrap(chunkData),
-                  new DispatcherContext.Builder()
-                      .setStage(DispatcherContext.WriteChunkStage.COMMIT_DATA)
-                      .build());
+          chunkManager.writeChunk(container, blockID, info,
+              ByteBuffer.wrap(chunkData), writeStage);
+          chunkManager.writeChunk(container, blockID, info,
+              ByteBuffer.wrap(chunkData), commitStage);
         }
         blockData.setChunks(chunkList);
 
@@ -263,10 +273,8 @@ import static org.junit.Assert.assertFalse;
         }
       }
 
-      File[] chunkFilesAfter = chunksPath.listFiles();
-      assertNotNull(chunkFilesAfter);
-      assertEquals((deletedBlocks + normalBlocks) * chunksPerBlock,
-          chunkFilesAfter.length);
+      chunkManagerTestInfo.validateFileCount(chunksPath, totalBlocks,
+          totalBlocks * chunksPerBlock);
     }
   }
 
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueContainerMarkUnhealthy.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueContainerMarkUnhealthy.java
index a6d796d..ef78fcc 100644
--- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueContainerMarkUnhealthy.java
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueContainerMarkUnhealthy.java
@@ -35,6 +35,8 @@ import org.junit.Test;
 import org.junit.rules.ExpectedException;
 import org.junit.rules.TemporaryFolder;
 import org.junit.rules.Timeout;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 import org.mockito.Mockito;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -55,6 +57,7 @@ import static org.mockito.Mockito.mock;
  * Tests unhealthy container functionality in the {@link KeyValueContainer}
  * class.
  */
+@RunWith(Parameterized.class)
 public class TestKeyValueContainerMarkUnhealthy {
   public static final Logger LOG = LoggerFactory.getLogger(
       TestKeyValueContainerMarkUnhealthy.class);
@@ -76,6 +79,17 @@ public class TestKeyValueContainerMarkUnhealthy {
   private KeyValueContainer keyValueContainer;
   private UUID datanodeId;
 
+  private final ChunkLayOutVersion layout;
+
+  public TestKeyValueContainerMarkUnhealthy(ChunkLayOutVersion layout) {
+    this.layout = layout;
+  }
+
+  @Parameterized.Parameters
+  public static Iterable<Object[]> parameters() {
+    return ChunkLayoutTestInfo.chunkLayoutParameters();
+  }
+
   @Before
   public void setUp() throws Exception {
     conf = new OzoneConfiguration();
@@ -90,7 +104,7 @@ public class TestKeyValueContainerMarkUnhealthy {
         .thenReturn(hddsVolume);
 
     keyValueContainerData = new KeyValueContainerData(1L,
-        ChunkLayOutVersion.FILE_PER_CHUNK,
+        layout,
         (long) StorageUnit.GB.toBytes(5), UUID.randomUUID().toString(),
         datanodeId.toString());
     final File metaDir = GenericTestUtils.getRandomizedTestDir();
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandler.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandler.java
index fb27e88..d9e7f09 100644
--- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandler.java
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandler.java
@@ -29,7 +29,6 @@ import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
     .ContainerCommandRequestProto;
 import org.apache.hadoop.hdds.scm.container.common.helpers
     .StorageContainerException;
-import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics;
 import org.apache.hadoop.ozone.container.common.impl.ChunkLayOutVersion;
 import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
@@ -39,13 +38,14 @@ import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
 import org.apache.hadoop.ozone.container.common.transport.server.ratis.DispatcherContext;
 import org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet;
 import org.apache.hadoop.test.GenericTestUtils;
-import org.junit.Assert;
-import org.junit.BeforeClass;
+import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TestRule;
 import org.junit.rules.Timeout;
 
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 import org.mockito.Mockito;
 
 import static org.apache.hadoop.hdds.HddsConfigKeys
@@ -66,6 +66,7 @@ import java.util.UUID;
 /**
  * Unit tests for {@link KeyValueHandler}.
  */
+@RunWith(Parameterized.class)
 public class TestKeyValueHandler {
 
   @Rule
@@ -76,13 +77,21 @@ public class TestKeyValueHandler {
 
   private final static String DATANODE_UUID = UUID.randomUUID().toString();
 
-  private final String baseDir = MiniDFSCluster.getBaseDirectory();
-  private final String volume = baseDir + "disk1";
-
   private static final long DUMMY_CONTAINER_ID = 9999;
 
-  @BeforeClass
-  public static void setup() throws StorageContainerException {
+  private final ChunkLayOutVersion layout;
+
+  public TestKeyValueHandler(ChunkLayOutVersion layout) {
+    this.layout = layout;
+  }
+
+  @Parameterized.Parameters
+  public static Iterable<Object[]> parameters() {
+    return ChunkLayoutTestInfo.chunkLayoutParameters();
+  }
+
+  @Before
+  public void setup() throws StorageContainerException {
     // Create mock HddsDispatcher and KeyValueHandler.
     handler = Mockito.mock(KeyValueHandler.class);
     dispatcher = Mockito.mock(HddsDispatcher.class);
@@ -101,11 +110,11 @@ public class TestKeyValueHandler {
         .thenCallRealMethod();
   }
 
-  @Test
   /**
    * Test that Handler handles different command types correctly.
    */
-  public void testHandlerCommandHandling() throws Exception {
+  @Test
+  public void testHandlerCommandHandling() {
 
     // Test Create Container Request handling
     ContainerCommandRequestProto createContainerRequest =
@@ -276,14 +285,11 @@ public class TestKeyValueHandler {
 
   private ContainerCommandRequestProto getDummyCommandRequestProto(
       ContainerProtos.Type cmdType) {
-    ContainerCommandRequestProto request =
-        ContainerProtos.ContainerCommandRequestProto.newBuilder()
-            .setCmdType(cmdType)
-            .setContainerID(DUMMY_CONTAINER_ID)
-            .setDatanodeUuid(DATANODE_UUID)
-            .build();
-
-    return request;
+    return ContainerCommandRequestProto.newBuilder()
+        .setCmdType(cmdType)
+        .setContainerID(DUMMY_CONTAINER_ID)
+        .setDatanodeUuid(DATANODE_UUID)
+        .build();
   }
 
 
@@ -292,7 +298,7 @@ public class TestKeyValueHandler {
     long containerID = 1234L;
     Configuration conf = new Configuration();
     KeyValueContainerData kvData = new KeyValueContainerData(containerID,
-        ChunkLayOutVersion.FILE_PER_CHUNK,
+        layout,
         (long) StorageUnit.GB.toBytes(1), UUID.randomUUID().toString(),
         UUID.randomUUID().toString());
     KeyValueContainer container = new KeyValueContainer(kvData, conf);
@@ -316,8 +322,7 @@ public class TestKeyValueHandler {
     ContainerProtos.ContainerCommandResponseProto response =
         handler.handleCloseContainer(closeContainerRequest, container);
 
-    Assert.assertTrue("Close container should return Invalid container error",
-        response.getResult().equals(
-            ContainerProtos.Result.INVALID_CONTAINER_STATE));
+    assertEquals("Close container should return Invalid container error",
+        ContainerProtos.Result.INVALID_CONTAINER_STATE, response.getResult());
   }
 }
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestTarContainerPacker.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestTarContainerPacker.java
index c9ab4cc..bee77c7 100644
--- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestTarContainerPacker.java
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestTarContainerPacker.java
@@ -52,6 +52,8 @@ import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 
 import static java.nio.charset.StandardCharsets.UTF_8;
 import static org.apache.commons.compress.compressors.CompressorStreamFactory.GZIP;
@@ -59,6 +61,7 @@ import static org.apache.commons.compress.compressors.CompressorStreamFactory.GZ
 /**
  * Test the tar/untar for a given container.
  */
+@RunWith(Parameterized.class)
 public class TestTarContainerPacker {
 
   private static final String TEST_DB_FILE_NAME = "test1";
@@ -85,6 +88,17 @@ public class TestTarContainerPacker {
 
   private static final AtomicInteger CONTAINER_ID = new AtomicInteger(1);
 
+  private final ChunkLayOutVersion layout;
+
+  public TestTarContainerPacker(ChunkLayOutVersion layout) {
+    this.layout = layout;
+  }
+
+  @Parameterized.Parameters
+  public static Iterable<Object[]> parameters() {
+    return ChunkLayoutTestInfo.chunkLayoutParameters();
+  }
+
   @BeforeClass
   public static void init() throws IOException {
     initDir(SOURCE_CONTAINER_ROOT);
@@ -116,7 +130,7 @@ public class TestTarContainerPacker {
     Files.createDirectories(dataDir);
 
     KeyValueContainerData containerData = new KeyValueContainerData(
-        id, ChunkLayOutVersion.FILE_PER_CHUNK,
+        id, layout,
         -1, UUID.randomUUID().toString(), UUID.randomUUID().toString());
     containerData.setChunksPath(dataDir.toString());
     containerData.setMetadataPath(dbDir.getParent().toString());
@@ -369,4 +383,4 @@ public class TestTarContainerPacker {
     }
   }
 
-}
\ No newline at end of file
+}
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/impl/AbstractTestChunkManager.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/impl/AbstractTestChunkManager.java
new file mode 100644
index 0000000..705bf14
--- /dev/null
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/impl/AbstractTestChunkManager.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.container.keyvalue.impl;
+
+import org.apache.hadoop.conf.StorageUnit;
+import org.apache.hadoop.hdds.client.BlockID;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo;
+import org.apache.hadoop.ozone.container.common.impl.ChunkLayOutVersion;
+import org.apache.hadoop.ozone.container.common.transport.server.ratis.DispatcherContext;
+import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
+import org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet;
+import org.apache.hadoop.ozone.container.common.volume.RoundRobinVolumeChoosingPolicy;
+import org.apache.hadoop.ozone.container.common.volume.VolumeIOStats;
+import org.apache.hadoop.ozone.container.common.volume.VolumeSet;
+import org.apache.hadoop.ozone.container.keyvalue.ChunkLayoutTestInfo;
+import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainer;
+import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
+import org.apache.hadoop.ozone.container.keyvalue.interfaces.ChunkManager;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.rules.TemporaryFolder;
+import org.mockito.Mockito;
+
+import java.io.File;
+import java.nio.Buffer;
+import java.nio.ByteBuffer;
+import java.util.UUID;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.anyList;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.Mockito.mock;
+
+/**
+ * Helpers for ChunkManager implementation tests.
+ */
+public abstract class AbstractTestChunkManager {
+
+  private HddsVolume hddsVolume;
+  private KeyValueContainerData keyValueContainerData;
+  private KeyValueContainer keyValueContainer;
+  private BlockID blockID;
+  private ChunkInfo chunkInfo;
+  private ByteBuffer data;
+  private byte[] header;
+
+  @Rule
+  public TemporaryFolder folder = new TemporaryFolder();
+
+  protected abstract ChunkLayoutTestInfo getStrategy();
+
+  protected ChunkManager createTestSubject() {
+    return getStrategy().createChunkManager(true);
+  }
+
+  @Before
+  public final void setUp() throws Exception {
+    OzoneConfiguration config = new OzoneConfiguration();
+    getStrategy().updateConfig(config);
+    UUID datanodeId = UUID.randomUUID();
+    hddsVolume = new HddsVolume.Builder(folder.getRoot()
+        .getAbsolutePath()).conf(config).datanodeUuid(datanodeId
+        .toString()).build();
+
+    VolumeSet volumeSet = mock(MutableVolumeSet.class);
+
+    RoundRobinVolumeChoosingPolicy volumeChoosingPolicy =
+        mock(RoundRobinVolumeChoosingPolicy.class);
+    Mockito.when(volumeChoosingPolicy.chooseVolume(anyList(), anyLong()))
+        .thenReturn(hddsVolume);
+
+    keyValueContainerData = new KeyValueContainerData(1L,
+        ChunkLayOutVersion.getConfiguredVersion(config),
+        (long) StorageUnit.GB.toBytes(5), UUID.randomUUID().toString(),
+        datanodeId.toString());
+
+    keyValueContainer = new KeyValueContainer(keyValueContainerData, config);
+
+    keyValueContainer.create(volumeSet, volumeChoosingPolicy,
+        UUID.randomUUID().toString());
+
+    header = "my header".getBytes(UTF_8);
+    byte[] bytes = "testing write chunks".getBytes(UTF_8);
+    data = ByteBuffer.allocate(header.length + bytes.length)
+        .put(header).put(bytes);
+    rewindBufferToDataStart();
+
+    // Creating BlockData
+    blockID = new BlockID(1L, 1L);
+    chunkInfo = new ChunkInfo(String.format("%d.data.%d", blockID
+        .getLocalID(), 0), 0, bytes.length);
+  }
+
+  protected DispatcherContext getDispatcherContext() {
+    return new DispatcherContext.Builder().build();
+  }
+
+  protected Buffer rewindBufferToDataStart() {
+    return data.position(header.length);
+  }
+
+  protected void checkChunkFileCount(int expected) {
+    //As in Setup, we try to create container, these paths should exist.
+    String path = keyValueContainerData.getChunksPath();
+    assertNotNull(path);
+
+    File dir = new File(path);
+    assertTrue(dir.exists());
+
+    File[] files = dir.listFiles();
+    assertNotNull(files);
+    assertEquals(expected, files.length);
+  }
+
+  protected void checkWriteIOStats(long length, long opCount) {
+    VolumeIOStats volumeIOStats = hddsVolume.getVolumeIOStats();
+    assertEquals(length, volumeIOStats.getWriteBytes());
+    assertEquals(opCount, volumeIOStats.getWriteOpCount());
+  }
+
+  protected void checkReadIOStats(long length, long opCount) {
+    VolumeIOStats volumeIOStats = hddsVolume.getVolumeIOStats();
+    assertEquals(length, volumeIOStats.getReadBytes());
+    assertEquals(opCount, volumeIOStats.getReadOpCount());
+  }
+
+  protected HddsVolume getHddsVolume() {
+    return hddsVolume;
+  }
+
+  protected KeyValueContainerData getKeyValueContainerData() {
+    return keyValueContainerData;
+  }
+
+  protected KeyValueContainer getKeyValueContainer() {
+    return keyValueContainer;
+  }
+
+  protected BlockID getBlockID() {
+    return blockID;
+  }
+
+  protected ChunkInfo getChunkInfo() {
+    return chunkInfo;
+  }
+
+  protected ByteBuffer getData() {
+    return data;
+  }
+
+}
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/impl/CommonChunkManagerTestCases.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/impl/CommonChunkManagerTestCases.java
new file mode 100644
index 0000000..c621028
--- /dev/null
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/impl/CommonChunkManagerTestCases.java
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.container.keyvalue.impl;
+
+import org.apache.hadoop.hdds.client.BlockID;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
+import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo;
+import org.apache.hadoop.ozone.container.common.transport.server.ratis.DispatcherContext;
+import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainer;
+import org.apache.hadoop.ozone.container.keyvalue.interfaces.ChunkManager;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.junit.Test;
+
+import java.nio.ByteBuffer;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * Common test cases for ChunkManager implementation tests.
+ */
+public abstract class CommonChunkManagerTestCases
+    extends AbstractTestChunkManager {
+
+  @Test
+  public void testWriteChunkIncorrectLength() {
+    // GIVEN
+    ChunkManager chunkManager = createTestSubject();
+    try {
+      long randomLength = 200L;
+      BlockID blockID = getBlockID();
+      ChunkInfo chunkInfo = new ChunkInfo(
+          String.format("%d.data.%d", blockID.getLocalID(), 0),
+          0, randomLength);
+
+      chunkManager.writeChunk(getKeyValueContainer(), blockID, chunkInfo,
+          getData(),
+          getDispatcherContext());
+
+      // THEN
+      fail("testWriteChunkIncorrectLength failed");
+    } catch (StorageContainerException ex) {
+      // As we got an exception, writeBytes should be 0.
+      checkWriteIOStats(0, 0);
+      GenericTestUtils.assertExceptionContains("Unexpected buffer size", ex);
+      assertEquals(ContainerProtos.Result.INVALID_WRITE_SIZE, ex.getResult());
+    }
+  }
+
+  @Test
+  public void testWriteChunkStageCombinedData() throws Exception {
+    // GIVEN
+    ChunkManager chunkManager = createTestSubject();
+    checkChunkFileCount(0);
+    checkWriteIOStats(0, 0);
+
+    chunkManager.writeChunk(getKeyValueContainer(), getBlockID(),
+        getChunkInfo(), getData(),
+        getDispatcherContext());
+
+    // THEN
+    checkChunkFileCount(1);
+    checkWriteIOStats(getChunkInfo().getLen(), 1);
+  }
+
+  @Test
+  public void testWriteReadChunk() throws Exception {
+    // GIVEN
+    ChunkManager chunkManager = createTestSubject();
+    checkWriteIOStats(0, 0);
+    DispatcherContext dispatcherContext = getDispatcherContext();
+    KeyValueContainer container = getKeyValueContainer();
+    BlockID blockID = getBlockID();
+    ChunkInfo chunkInfo = getChunkInfo();
+
+    chunkManager.writeChunk(container, blockID,
+        chunkInfo, getData(),
+        dispatcherContext);
+
+    checkWriteIOStats(chunkInfo.getLen(), 1);
+    checkReadIOStats(0, 0);
+
+    ByteBuffer expectedData = chunkManager
+        .readChunk(container, blockID, chunkInfo, dispatcherContext)
+        .toByteString().asReadOnlyByteBuffer();
+
+    // THEN
+    assertEquals(chunkInfo.getLen(), expectedData.remaining());
+    assertEquals(expectedData.rewind(), rewindBufferToDataStart());
+    checkReadIOStats(expectedData.limit(), 1);
+  }
+
+  @Test
+  public void testDeleteChunk() throws Exception {
+    // GIVEN
+    ChunkManager chunkManager = createTestSubject();
+    chunkManager.writeChunk(getKeyValueContainer(), getBlockID(),
+        getChunkInfo(), getData(),
+        getDispatcherContext());
+    checkChunkFileCount(1);
+
+    chunkManager.deleteChunk(getKeyValueContainer(), getBlockID(),
+        getChunkInfo());
+
+    // THEN
+    checkChunkFileCount(0);
+  }
+
+  @Test
+  public void testDeletePartialChunkUnsupportedRequest() {
+    // GIVEN
+    ChunkManager chunkManager = createTestSubject();
+    try {
+      chunkManager.writeChunk(getKeyValueContainer(), getBlockID(),
+          getChunkInfo(), getData(),
+          getDispatcherContext());
+      long randomLength = 200L;
+      ChunkInfo chunkInfo = new ChunkInfo(String.format("%d.data.%d",
+          getBlockID().getLocalID(), 0), 0, randomLength);
+
+      // WHEN
+      chunkManager.deleteChunk(getKeyValueContainer(), getBlockID(),
+          chunkInfo);
+
+      // THEN
+      fail("testDeleteChunkUnsupportedRequest");
+    } catch (StorageContainerException ex) {
+      assertEquals(ContainerProtos.Result.UNSUPPORTED_REQUEST, ex.getResult());
+    }
+  }
+
+  @Test
+  public void testReadChunkFileNotExists() {
+    // GIVEN
+    ChunkManager chunkManager = createTestSubject();
+    try {
+
+      // WHEN
+      chunkManager.readChunk(getKeyValueContainer(),
+          getBlockID(), getChunkInfo(), getDispatcherContext());
+
+      // THEN
+      fail("testReadChunkFileNotExists failed");
+    } catch (StorageContainerException ex) {
+      assertEquals(ContainerProtos.Result.UNABLE_TO_FIND_CHUNK, ex.getResult());
+    }
+  }
+
+  @Test
+  public void testWriteAndReadChunkMultipleTimes() throws Exception {
+    // GIVEN
+    ChunkManager chunkManager = createTestSubject();
+    KeyValueContainer container = getKeyValueContainer();
+    BlockID blockID = getBlockID();
+    long localID = blockID.getLocalID();
+    long len = getChunkInfo().getLen();
+    int count = 100;
+    ByteBuffer data = getData();
+    DispatcherContext context = getDispatcherContext();
+
+    // WHEN
+    for (int i = 0; i< count; i++) {
+      ChunkInfo info = new ChunkInfo(String.format("%d.data.%d", localID, i),
+          i * len, len);
+      chunkManager.writeChunk(container, blockID, info, data, context);
+      rewindBufferToDataStart();
+    }
+
+    // THEN
+    checkWriteIOStats(len * count, count);
+    assertTrue(getHddsVolume().getVolumeIOStats().getWriteTime() > 0);
+
+    // WHEN
+    for (int i = 0; i< count; i++) {
+      ChunkInfo info = new ChunkInfo(String.format("%d.data.%d", localID, i),
+          i * len, len);
+      chunkManager.readChunk(container, blockID, info, context);
+    }
+
+    // THEN
+    checkReadIOStats(len * count, count);
+  }
+
+}
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/impl/TestBlockManagerImpl.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/impl/TestBlockManagerImpl.java
index 09ec6b7..3bba85b 100644
--- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/impl/TestBlockManagerImpl.java
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/impl/TestBlockManagerImpl.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
 import org.apache.hadoop.ozone.container.common.volume.RoundRobinVolumeChoosingPolicy;
 import org.apache.hadoop.ozone.container.common.volume.VolumeSet;
 import org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet;
+import org.apache.hadoop.ozone.container.keyvalue.ChunkLayoutTestInfo;
 import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainer;
 import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
 import org.apache.hadoop.test.GenericTestUtils;
@@ -38,6 +39,8 @@ import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 import org.mockito.Mockito;
 
 import java.util.ArrayList;
@@ -55,6 +58,7 @@ import static org.mockito.Mockito.mock;
 /**
  * This class is used to test key related operations on the container.
  */
+@RunWith(Parameterized.class)
 public class TestBlockManagerImpl {
 
   @Rule
@@ -69,6 +73,17 @@ public class TestBlockManagerImpl {
   private BlockManagerImpl blockManager;
   private BlockID blockID;
 
+  private final ChunkLayOutVersion layout;
+
+  public TestBlockManagerImpl(ChunkLayOutVersion layout) {
+    this.layout = layout;
+  }
+
+  @Parameterized.Parameters
+  public static Iterable<Object[]> parameters() {
+    return ChunkLayoutTestInfo.chunkLayoutParameters();
+  }
+
   @Before
   public void setUp() throws Exception {
     config = new OzoneConfiguration();
@@ -84,7 +99,7 @@ public class TestBlockManagerImpl {
         .thenReturn(hddsVolume);
 
     keyValueContainerData = new KeyValueContainerData(1L,
-        ChunkLayOutVersion.FILE_PER_CHUNK,
+        layout,
         (long) StorageUnit.GB.toBytes(5), UUID.randomUUID().toString(),
         datanodeId.toString());
 
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/impl/TestChunkManager.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/impl/TestChunkManager.java
deleted file mode 100644
index d8faacc..0000000
--- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/impl/TestChunkManager.java
+++ /dev/null
@@ -1,325 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-
-package org.apache.hadoop.ozone.container.keyvalue.impl;
-
-import org.apache.hadoop.conf.StorageUnit;
-import org.apache.hadoop.hdds.client.BlockID;
-import org.apache.hadoop.hdds.conf.OzoneConfiguration;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
-import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
-import org.apache.hadoop.ozone.OzoneConsts;
-import org.apache.hadoop.ozone.common.ChunkBuffer;
-import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo;
-import org.apache.hadoop.ozone.container.common.transport.server.ratis.DispatcherContext;
-import org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet;
-import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainer;
-import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
-import org.apache.hadoop.ozone.container.keyvalue.helpers.ChunkUtils;
-import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
-import org.apache.hadoop.ozone.container.common.volume.RoundRobinVolumeChoosingPolicy;
-import org.apache.hadoop.ozone.container.common.volume.VolumeIOStats;
-import org.apache.hadoop.ozone.container.common.volume.VolumeSet;
-import org.apache.hadoop.ozone.container.keyvalue.interfaces.ChunkManager;
-import org.apache.hadoop.test.GenericTestUtils;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.mockito.Mockito;
-
-import java.io.File;
-import java.nio.Buffer;
-import java.nio.ByteBuffer;
-import java.util.UUID;
-
-import static java.nio.charset.StandardCharsets.UTF_8;
-import static org.apache.hadoop.ozone.container.common.impl.ChunkLayOutVersion.FILE_PER_CHUNK;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-import static org.mockito.ArgumentMatchers.anyList;
-import static org.mockito.ArgumentMatchers.anyLong;
-import static org.mockito.Mockito.mock;
-
-/**
- * This class is used to test ChunkManager operations.
- */
-// TODO test common behavior for various implementations
-// TODO extract implementation-specific tests
-public class TestChunkManager {
-
-  private OzoneConfiguration config;
-  private String scmId = UUID.randomUUID().toString();
-  private VolumeSet volumeSet;
-  private RoundRobinVolumeChoosingPolicy volumeChoosingPolicy;
-  private HddsVolume hddsVolume;
-  private KeyValueContainerData keyValueContainerData;
-  private KeyValueContainer keyValueContainer;
-  private BlockID blockID;
-  private ChunkManagerDispatcher chunkManager;
-  private ChunkInfo chunkInfo;
-  private ByteBuffer data;
-
-  @Rule
-  public TemporaryFolder folder = new TemporaryFolder();
-  private byte[] header;
-
-  @Before
-  public void setUp() throws Exception {
-    config = new OzoneConfiguration();
-    UUID datanodeId = UUID.randomUUID();
-    hddsVolume = new HddsVolume.Builder(folder.getRoot()
-        .getAbsolutePath()).conf(config).datanodeUuid(datanodeId
-        .toString()).build();
-
-    volumeSet = mock(MutableVolumeSet.class);
-
-    volumeChoosingPolicy = mock(RoundRobinVolumeChoosingPolicy.class);
-    Mockito.when(volumeChoosingPolicy.chooseVolume(anyList(), anyLong()))
-        .thenReturn(hddsVolume);
-
-    keyValueContainerData = new KeyValueContainerData(1L, FILE_PER_CHUNK,
-        (long) StorageUnit.GB.toBytes(5), UUID.randomUUID().toString(),
-        datanodeId.toString());
-
-    keyValueContainer = new KeyValueContainer(keyValueContainerData, config);
-
-    keyValueContainer.create(volumeSet, volumeChoosingPolicy, scmId);
-
-    header = "my header".getBytes(UTF_8);
-    byte[] bytes = "testing write chunks".getBytes(UTF_8);
-    data = ByteBuffer.allocate(header.length + bytes.length)
-        .put(header).put(bytes);
-    rewindBufferToDataStart();
-
-    // Creating BlockData
-    blockID = new BlockID(1L, 1L);
-    chunkInfo = new ChunkInfo(String.format("%d.data.%d", blockID
-        .getLocalID(), 0), 0, bytes.length);
-
-    // Create a ChunkManager object.
-    chunkManager = new ChunkManagerDispatcher(true);
-  }
-
-  private DispatcherContext getDispatcherContext() {
-    return new DispatcherContext.Builder().build();
-  }
-
-  @Test
-  public void testWriteChunkStageWriteAndCommit() throws Exception {
-    checkChunkFileCount(0);
-
-    // As no chunks are written to the volume writeBytes should be 0
-    checkWriteIOStats(0, 0);
-    chunkManager.writeChunk(keyValueContainer, blockID, chunkInfo, data,
-        new DispatcherContext.Builder()
-            .setStage(DispatcherContext.WriteChunkStage.WRITE_DATA).build());
-    // Now a chunk file is being written with Stage WRITE_DATA, so it should
-    // create a temporary chunk file.
-    checkChunkFileCount(1);
-
-    long term = 0;
-    long index = 0;
-    File chunkFile = ChunkUtils.getChunkFile(keyValueContainerData, chunkInfo);
-    File tempChunkFile = new File(chunkFile.getParent(),
-        chunkFile.getName() + OzoneConsts.CONTAINER_CHUNK_NAME_DELIMITER
-            + OzoneConsts.CONTAINER_TEMPORARY_CHUNK_PREFIX
-            + OzoneConsts.CONTAINER_CHUNK_NAME_DELIMITER + term
-            + OzoneConsts.CONTAINER_CHUNK_NAME_DELIMITER + index);
-
-    // As chunk write stage is WRITE_DATA, temp chunk file will be created.
-    assertTrue(tempChunkFile.exists());
-
-    checkWriteIOStats(chunkInfo.getLen(), 1);
-
-    chunkManager.writeChunk(keyValueContainer, blockID, chunkInfo, data,
-        new DispatcherContext.Builder()
-            .setStage(DispatcherContext.WriteChunkStage.COMMIT_DATA).build());
-
-    checkWriteIOStats(chunkInfo.getLen(), 1);
-
-    // Old temp file should have been renamed to chunk file.
-    checkChunkFileCount(1);
-
-    // As commit happened, chunk file should exist.
-    assertTrue(chunkFile.exists());
-    assertFalse(tempChunkFile.exists());
-  }
-
-  @Test
-  public void testWriteChunkIncorrectLength() {
-    try {
-      long randomLength = 200L;
-      chunkInfo = new ChunkInfo(String.format("%d.data.%d", blockID
-          .getLocalID(), 0), 0, randomLength);
-      chunkManager.writeChunk(keyValueContainer, blockID, chunkInfo, data,
-          getDispatcherContext());
-      fail("testWriteChunkIncorrectLength failed");
-    } catch (StorageContainerException ex) {
-      // As we got an exception, writeBytes should be 0.
-      checkWriteIOStats(0, 0);
-      GenericTestUtils.assertExceptionContains("Unexpected buffer size", ex);
-      assertEquals(ContainerProtos.Result.INVALID_WRITE_SIZE, ex.getResult());
-    }
-  }
-
-  @Test
-  public void testWriteChunkStageCombinedData() throws Exception {
-    checkChunkFileCount(0);
-    checkWriteIOStats(0, 0);
-    chunkManager.writeChunk(keyValueContainer, blockID, chunkInfo, data,
-        getDispatcherContext());
-    // Now a chunk file is being written with Stage COMBINED_DATA, so it should
-    // create a chunk file.
-    checkChunkFileCount(1);
-    File chunkFile = ChunkUtils.getChunkFile(keyValueContainerData, chunkInfo);
-    assertTrue(chunkFile.exists());
-    checkWriteIOStats(chunkInfo.getLen(), 1);
-  }
-
-  @Test
-  public void testReadChunk() throws Exception {
-    checkWriteIOStats(0, 0);
-    DispatcherContext dispatcherContext = getDispatcherContext();
-    chunkManager.writeChunk(keyValueContainer, blockID, chunkInfo, data,
-        dispatcherContext);
-    checkWriteIOStats(chunkInfo.getLen(), 1);
-    checkReadIOStats(0, 0);
-    ByteBuffer expectedData = chunkManager
-        .readChunk(keyValueContainer, blockID, chunkInfo, dispatcherContext)
-        .toByteString().asReadOnlyByteBuffer();
-    assertEquals(chunkInfo.getLen(), expectedData.remaining());
-    assertEquals(expectedData.rewind(), rewindBufferToDataStart());
-    checkReadIOStats(expectedData.limit(), 1);
-  }
-
-  @Test
-  public void testDeleteChunk() throws Exception {
-    chunkManager.writeChunk(keyValueContainer, blockID, chunkInfo, data,
-        getDispatcherContext());
-    checkChunkFileCount(1);
-
-    chunkManager.deleteChunk(keyValueContainer, blockID, chunkInfo);
-
-    checkChunkFileCount(0);
-  }
-
-  @Test
-  public void testDeleteChunkUnsupportedRequest() {
-    try {
-      chunkManager.writeChunk(keyValueContainer, blockID, chunkInfo, data,
-          getDispatcherContext());
-      long randomLength = 200L;
-      chunkInfo = new ChunkInfo(String.format("%d.data.%d", blockID
-          .getLocalID(), 0), 0, randomLength);
-      chunkManager.deleteChunk(keyValueContainer, blockID, chunkInfo);
-      fail("testDeleteChunkUnsupportedRequest");
-    } catch (StorageContainerException ex) {
-      assertEquals(ContainerProtos.Result.UNSUPPORTED_REQUEST, ex.getResult());
-    }
-  }
-
-  @Test
-  public void testReadChunkFileNotExists() {
-    try {
-      // trying to read a chunk, where chunk file does not exist
-      chunkManager.readChunk(keyValueContainer,
-          blockID, chunkInfo, getDispatcherContext());
-      fail("testReadChunkFileNotExists failed");
-    } catch (StorageContainerException ex) {
-      assertEquals(ContainerProtos.Result.UNABLE_TO_FIND_CHUNK, ex.getResult());
-    }
-  }
-
-  @Test
-  public void testWriteAndReadChunkMultipleTimes() throws Exception {
-    for (int i=0; i<100; i++) {
-      ChunkInfo info = new ChunkInfo(
-          String.format("%d.data.%d", blockID.getLocalID(), i),
-          0, this.chunkInfo.getLen());
-      chunkManager.writeChunk(keyValueContainer, blockID, info, data,
-          getDispatcherContext());
-      rewindBufferToDataStart();
-    }
-    checkWriteIOStats(chunkInfo.getLen()*100, 100);
-    assertTrue(hddsVolume.getVolumeIOStats().getWriteTime() > 0);
-
-    for (int i=0; i<100; i++) {
-      ChunkInfo info = new ChunkInfo(
-          String.format("%d.data.%d", blockID.getLocalID(), i),
-          0, this.chunkInfo.getLen());
-      chunkManager.readChunk(keyValueContainer, blockID, info,
-          getDispatcherContext());
-    }
-    checkReadIOStats(chunkInfo.getLen()*100, 100);
-    assertTrue(hddsVolume.getVolumeIOStats().getReadTime() > 0);
-  }
-
-  @Test
-  public void dummyManagerDoesNotWriteToFile() throws Exception {
-    ChunkManager dummy = new ChunkManagerDummyImpl();
-    DispatcherContext ctx = new DispatcherContext.Builder()
-        .setStage(DispatcherContext.WriteChunkStage.WRITE_DATA).build();
-
-    dummy.writeChunk(keyValueContainer, blockID, chunkInfo, data, ctx);
-
-    checkChunkFileCount(0);
-  }
-
-  @Test
-  public void dummyManagerReadsAnyChunk() throws Exception {
-    ChunkManager dummy = new ChunkManagerDummyImpl();
-
-    ChunkBuffer dataRead = dummy.readChunk(keyValueContainer,
-        blockID, chunkInfo, getDispatcherContext());
-
-    assertNotNull(dataRead);
-  }
-
-  private Buffer rewindBufferToDataStart() {
-    return data.position(header.length);
-  }
-
-  private void checkChunkFileCount(int expected) {
-    //As in Setup, we try to create container, these paths should exist.
-    String path = keyValueContainerData.getChunksPath();
-    assertNotNull(path);
-
-    File dir = new File(path);
-    assertTrue(dir.exists());
-
-    File[] files = dir.listFiles();
-    assertNotNull(files);
-    assertEquals(expected, files.length);
-  }
-
-  private void checkWriteIOStats(long length, long opCount) {
-    VolumeIOStats volumeIOStats = hddsVolume.getVolumeIOStats();
-    assertEquals(length, volumeIOStats.getWriteBytes());
-    assertEquals(opCount, volumeIOStats.getWriteOpCount());
-  }
-
-  private void checkReadIOStats(long length, long opCount) {
-    VolumeIOStats volumeIOStats = hddsVolume.getVolumeIOStats();
-    assertEquals(length, volumeIOStats.getReadBytes());
-    assertEquals(opCount, volumeIOStats.getReadOpCount());
-  }
-}
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/impl/TestChunkManagerDummyImpl.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/impl/TestChunkManagerDummyImpl.java
new file mode 100644
index 0000000..d882ba4
--- /dev/null
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/impl/TestChunkManagerDummyImpl.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.container.keyvalue.impl;
+
+import org.apache.hadoop.ozone.common.ChunkBuffer;
+import org.apache.hadoop.ozone.container.common.transport.server.ratis.DispatcherContext;
+import org.apache.hadoop.ozone.container.keyvalue.ChunkLayoutTestInfo;
+import org.apache.hadoop.ozone.container.keyvalue.interfaces.ChunkManager;
+import org.junit.Test;
+
+import static org.junit.Assert.assertNotNull;
+
+/**
+ * Tests for ChunkManagerDummyImpl.
+ */
+public class TestChunkManagerDummyImpl extends AbstractTestChunkManager {
+
+  @Override
+  protected ChunkLayoutTestInfo getStrategy() {
+    return ChunkLayoutTestInfo.DUMMY;
+  }
+
+  @Test
+  public void dummyManagerDoesNotWriteToFile() throws Exception {
+    ChunkManager subject = createTestSubject();
+    DispatcherContext ctx = new DispatcherContext.Builder()
+        .setStage(DispatcherContext.WriteChunkStage.WRITE_DATA).build();
+
+    subject.writeChunk(getKeyValueContainer(), getBlockID(), getChunkInfo(),
+        getData(), ctx);
+
+    checkChunkFileCount(0);
+  }
+
+  @Test
+  public void dummyManagerReadsAnyChunk() throws Exception {
+    ChunkManager dummy = createTestSubject();
+
+    ChunkBuffer dataRead = dummy.readChunk(getKeyValueContainer(),
+        getBlockID(), getChunkInfo(), getDispatcherContext());
+
+    assertNotNull(dataRead);
+  }
+}
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/impl/TestFilePerBlockStrategy.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/impl/TestFilePerBlockStrategy.java
new file mode 100644
index 0000000..8ccf9ee
--- /dev/null
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/impl/TestFilePerBlockStrategy.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.container.keyvalue.impl;
+
+import org.apache.commons.codec.binary.Hex;
+import org.apache.hadoop.hdds.client.BlockID;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.common.ChunkBuffer;
+import org.apache.hadoop.ozone.container.ContainerTestHelper;
+import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo;
+import org.apache.hadoop.ozone.container.common.transport.server.ratis.DispatcherContext;
+import org.apache.hadoop.ozone.container.keyvalue.ChunkLayoutTestInfo;
+import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainer;
+import org.apache.hadoop.ozone.container.keyvalue.interfaces.ChunkManager;
+import org.junit.Test;
+
+import java.nio.ByteBuffer;
+import java.security.MessageDigest;
+
+import static org.apache.hadoop.ozone.container.ContainerTestHelper.getChunk;
+import static org.apache.hadoop.ozone.container.ContainerTestHelper.setDataChecksum;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+/**
+ * Test for FilePerBlockStrategy.
+ */
+public class TestFilePerBlockStrategy extends CommonChunkManagerTestCases {
+
+  @Test
+  public void testDeletePartialChunkWithOffsetUnsupportedRequest() {
+    // GIVEN
+    ChunkManager chunkManager = createTestSubject();
+    try {
+      KeyValueContainer container = getKeyValueContainer();
+      BlockID blockID = getBlockID();
+      chunkManager.writeChunk(container, blockID,
+          getChunkInfo(), getData(), getDispatcherContext());
+      ChunkInfo chunkInfo = new ChunkInfo(String.format("%d.data.%d",
+          blockID.getLocalID(), 0), 123, getChunkInfo().getLen());
+
+      // WHEN
+      chunkManager.deleteChunk(container, blockID, chunkInfo);
+
+      // THEN
+      fail("testDeleteChunkUnsupportedRequest");
+    } catch (StorageContainerException ex) {
+      assertEquals(ContainerProtos.Result.UNSUPPORTED_REQUEST, ex.getResult());
+    }
+  }
+
+  /**
+   * This test writes data as many small writes and tries to read back the data
+   * in a single large read.
+   */
+  @Test
+  public void testMultipleWriteSingleRead() throws Exception {
+    final int datalen = 1024;
+    final int chunkCount = 1024;
+
+    KeyValueContainer container = getKeyValueContainer();
+    BlockID blockID = getBlockID();
+    MessageDigest oldSha = MessageDigest.getInstance(OzoneConsts.FILE_HASH);
+    ChunkManager subject = createTestSubject();
+
+    for (int x = 0; x < chunkCount; x++) {
+      // we are writing to the same chunk file but at different offsets.
+      long offset = x * datalen;
+      ChunkInfo info = getChunk(
+          blockID.getLocalID(), 0, offset, datalen);
+      ChunkBuffer data = ContainerTestHelper.getData(datalen);
+      oldSha.update(data.toByteString().asReadOnlyByteBuffer());
+      data.rewind();
+      setDataChecksum(info, data);
+      subject.writeChunk(container, blockID, info, data,
+          getDispatcherContext());
+    }
+
+    // Request to read the whole data in a single go.
+    ChunkInfo largeChunk = getChunk(blockID.getLocalID(), 0, 0,
+        datalen * chunkCount);
+    ChunkBuffer chunk =
+        subject.readChunk(container, blockID, largeChunk,
+            getDispatcherContext());
+    ByteBuffer newdata = chunk.toByteString().asReadOnlyByteBuffer();
+    MessageDigest newSha = MessageDigest.getInstance(OzoneConsts.FILE_HASH);
+    newSha.update(newdata);
+    assertEquals(Hex.encodeHexString(oldSha.digest()),
+        Hex.encodeHexString(newSha.digest()));
+  }
+
+  /**
+   * Test partial within a single chunk.
+   */
+  @Test
+  public void testPartialRead() throws Exception {
+    final int datalen = 1024;
+    final int start = datalen / 4;
+    final int length = datalen / 2;
+
+    KeyValueContainer container = getKeyValueContainer();
+    BlockID blockID = getBlockID();
+    ChunkInfo info = getChunk(blockID.getLocalID(), 0, 0, datalen);
+    ChunkBuffer data = ContainerTestHelper.getData(datalen);
+    setDataChecksum(info, data);
+    DispatcherContext ctx = getDispatcherContext();
+    ChunkManager subject = createTestSubject();
+    subject.writeChunk(container, blockID, info, data, ctx);
+
+    ChunkBuffer readData = subject.readChunk(container, blockID, info, ctx);
+    assertEquals(data.rewind(), readData.rewind());
+
+    ChunkInfo info2 = getChunk(blockID.getLocalID(), 0, start, length);
+    ChunkBuffer readData2 = subject.readChunk(container, blockID, info2, ctx);
+    assertEquals(length, info2.getLen());
+    assertEquals(data.duplicate(start, start + length), readData2.rewind());
+  }
+
+  @Override
+  protected ChunkLayoutTestInfo getStrategy() {
+    return ChunkLayoutTestInfo.FILE_PER_BLOCK;
+  }
+}
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/impl/TestFilePerChunkStrategy.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/impl/TestFilePerChunkStrategy.java
new file mode 100644
index 0000000..d4515a8
--- /dev/null
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/impl/TestFilePerChunkStrategy.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.container.keyvalue.impl;
+
+import org.apache.hadoop.hdds.client.BlockID;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo;
+import org.apache.hadoop.ozone.container.common.impl.ChunkLayOutVersion;
+import org.apache.hadoop.ozone.container.common.transport.server.ratis.DispatcherContext;
+import org.apache.hadoop.ozone.container.keyvalue.ChunkLayoutTestInfo;
+import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainer;
+import org.apache.hadoop.ozone.container.keyvalue.interfaces.ChunkManager;
+import org.junit.Test;
+
+import java.io.File;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Test for FilePerChunkStrategy.
+ */
+public class TestFilePerChunkStrategy extends CommonChunkManagerTestCases {
+
+  @Override
+  protected ChunkLayoutTestInfo getStrategy() {
+    return ChunkLayoutTestInfo.FILE_PER_CHUNK;
+  }
+
+  @Test
+  public void testWriteChunkStageWriteAndCommit() throws Exception {
+    ChunkManager chunkManager = createTestSubject();
+
+    checkChunkFileCount(0);
+
+    // As no chunks are written to the volume writeBytes should be 0
+    checkWriteIOStats(0, 0);
+    KeyValueContainer container = getKeyValueContainer();
+    BlockID blockID = getBlockID();
+    ChunkInfo chunkInfo = getChunkInfo();
+    chunkManager.writeChunk(container, blockID, chunkInfo, getData(),
+        new DispatcherContext.Builder()
+            .setStage(DispatcherContext.WriteChunkStage.WRITE_DATA).build());
+    // Now a chunk file is being written with Stage WRITE_DATA, so it should
+    // create a temporary chunk file.
+    checkChunkFileCount(1);
+
+    long term = 0;
+    long index = 0;
+    File chunkFile = ChunkLayOutVersion.FILE_PER_CHUNK
+        .getChunkFile(container.getContainerData(), blockID, chunkInfo);
+    File tempChunkFile = new File(chunkFile.getParent(),
+        chunkFile.getName() + OzoneConsts.CONTAINER_CHUNK_NAME_DELIMITER
+            + OzoneConsts.CONTAINER_TEMPORARY_CHUNK_PREFIX
+            + OzoneConsts.CONTAINER_CHUNK_NAME_DELIMITER + term
+            + OzoneConsts.CONTAINER_CHUNK_NAME_DELIMITER + index);
+
+    // As chunk write stage is WRITE_DATA, temp chunk file will be created.
+    assertTrue(tempChunkFile.exists());
+
+    checkWriteIOStats(chunkInfo.getLen(), 1);
+
+    chunkManager.writeChunk(container, blockID, chunkInfo, getData(),
+        new DispatcherContext.Builder()
+            .setStage(DispatcherContext.WriteChunkStage.COMMIT_DATA).build());
+
+    checkWriteIOStats(chunkInfo.getLen(), 1);
+
+    // Old temp file should have been renamed to chunk file.
+    checkChunkFileCount(1);
+
+    // As commit happened, chunk file should exist.
+    assertTrue(chunkFile.exists());
+    assertFalse(tempChunkFile.exists());
+  }
+}
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainer.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainer.java
index 665274c..efcbb10 100644
--- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainer.java
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainer.java
@@ -39,6 +39,7 @@ import org.apache.hadoop.ozone.container.common.utils.ReferenceCountedDB;
 import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
 import org.apache.hadoop.ozone.container.common.volume.RoundRobinVolumeChoosingPolicy;
 import org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet;
+import org.apache.hadoop.ozone.container.keyvalue.ChunkLayoutTestInfo;
 import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainer;
 import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
 import org.apache.hadoop.ozone.container.keyvalue.helpers.BlockUtils;
@@ -48,6 +49,8 @@ import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 import org.mockito.Mockito;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -64,6 +67,7 @@ import static org.junit.Assert.assertEquals;
 /**
  * This class is used to test OzoneContainer.
  */
+@RunWith(Parameterized.class)
 public class TestOzoneContainer {
 
   private static final Logger LOG =
@@ -82,6 +86,17 @@ public class TestOzoneContainer {
   private HashMap<String, Long> commitSpaceMap; //RootDir -> committed space
   private final int numTestContainers = 10;
 
+  private final ChunkLayOutVersion layout;
+
+  public TestOzoneContainer(ChunkLayOutVersion layout) {
+    this.layout = layout;
+  }
+
+  @Parameterized.Parameters
+  public static Iterable<Object[]> parameters() {
+    return ChunkLayoutTestInfo.chunkLayoutParameters();
+  }
+
   @Before
   public void setUp() throws Exception {
     conf = new OzoneConfiguration();
@@ -119,7 +134,7 @@ public class TestOzoneContainer {
       HddsVolume myVolume;
 
       keyValueContainerData = new KeyValueContainerData(i,
-          ChunkLayOutVersion.FILE_PER_CHUNK,
+          layout,
           maxCap, UUID.randomUUID().toString(),
           datanodeDetails.getUuidString());
       keyValueContainer = new KeyValueContainer(
@@ -167,7 +182,7 @@ public class TestOzoneContainer {
       volume.incCommittedBytes(10);
     }
     keyValueContainerData = new KeyValueContainerData(99,
-        ChunkLayOutVersion.FILE_PER_CHUNK, containerSize,
+        layout, containerSize,
         UUID.randomUUID().toString(), datanodeDetails.getUuidString());
     keyValueContainer = new KeyValueContainer(keyValueContainerData, conf);
 
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java
index 55bbe02..b7415fa 100644
--- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java
@@ -29,6 +29,7 @@ import java.util.function.Function;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.ozone.container.common.impl.ChunkLayOutVersion;
 import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
+import org.apache.hadoop.ozone.container.keyvalue.ChunkLayoutTestInfo;
 import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainer;
 import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
 
@@ -36,6 +37,8 @@ import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 
 import javax.annotation.Nonnull;
 
@@ -45,6 +48,7 @@ import static java.util.Collections.emptyList;
 /**
  * Test the replication supervisor.
  */
+@RunWith(Parameterized.class)
 public class TestReplicationSupervisor {
 
   private final ContainerReplicator noopReplicator = task -> {};
@@ -58,6 +62,17 @@ public class TestReplicationSupervisor {
 
   private ContainerSet set;
 
+  private final ChunkLayOutVersion layout;
+
+  public TestReplicationSupervisor(ChunkLayOutVersion layout) {
+    this.layout = layout;
+  }
+
+  @Parameterized.Parameters
+  public static Iterable<Object[]> parameters() {
+    return ChunkLayoutTestInfo.chunkLayoutParameters();
+  }
+
   @Before
   public void setUp() throws Exception {
     set = new ContainerSet();
@@ -159,8 +174,7 @@ public class TestReplicationSupervisor {
   }
 
   private ReplicationSupervisor supervisorWithSuccessfulReplicator() {
-    return supervisorWith(supervisor -> new FakeReplicator(supervisor, set),
-        newDirectExecutorService());
+    return supervisorWith(FakeReplicator::new, newDirectExecutorService());
   }
 
   private ReplicationSupervisor supervisorWith(
@@ -175,33 +189,31 @@ public class TestReplicationSupervisor {
   /**
    * A fake replicator that simulates successful download of containers.
    */
-  private static class FakeReplicator implements ContainerReplicator {
+  private class FakeReplicator implements ContainerReplicator {
 
     private final OzoneConfiguration conf = new OzoneConfiguration();
     private final ReplicationSupervisor supervisor;
-    private final ContainerSet containerSet;
 
-    FakeReplicator(ReplicationSupervisor supervisor, ContainerSet set) {
+    FakeReplicator(ReplicationSupervisor supervisor) {
       this.supervisor = supervisor;
-      this.containerSet = set;
     }
 
     @Override
     public void replicate(ReplicationTask task) {
-      Assert.assertNull(containerSet.getContainer(task.getContainerId()));
+      Assert.assertNull(set.getContainer(task.getContainerId()));
 
       // assumes same-thread execution
       Assert.assertEquals(1, supervisor.getInFlightReplications());
 
       KeyValueContainerData kvcd =
           new KeyValueContainerData(task.getContainerId(),
-              ChunkLayOutVersion.FILE_PER_CHUNK, 100L,
+              layout, 100L,
               UUID.randomUUID().toString(), UUID.randomUUID().toString());
       KeyValueContainer kvc =
           new KeyValueContainer(kvcd, conf);
 
       try {
-        containerSet.addContainer(kvc);
+        set.addContainer(kvc);
         task.setStatus(ReplicationTask.Status.DONE);
       } catch (Exception e) {
         Assert.fail("Unexpected error: " + e.getMessage());
@@ -245,4 +257,4 @@ public class TestReplicationSupervisor {
       // ignore all tasks
     }
   }
-}
\ No newline at end of file
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: ozone-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: ozone-commits-help@hadoop.apache.org