You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by lj...@apache.org on 2021/01/06 14:47:19 UTC

[ozone] branch master updated (8146368 -> 431e909)

This is an automated email from the ASF dual-hosted git repository.

ljain pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git.


    from 8146368  HDDS-4419. Misleading SCM web UI Safe mode status  (#1599)
     new 628c8b6  Revert "HDDS-4369. Datanode should store the delete transaction as is in rocksDB (#1702)"
     new 431e909  HDDS-4369. Datanode should store the delete transaction as is in rocksDB (#1702)

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org


[ozone] 01/02: Revert "HDDS-4369. Datanode should store the delete transaction as is in rocksDB (#1702)"

Posted by lj...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ljain pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git

commit 628c8b695bed0d890b5ac8bba096c8f8a9d60c13
Author: Lokesh Jain <lj...@apache.org>
AuthorDate: Wed Jan 6 19:51:21 2021 +0530

    Revert "HDDS-4369. Datanode should store the delete transaction as is in rocksDB (#1702)"
    
    This reverts commit 86574d1f80c4a264b711d2c279c9b7471c3149d0.
---
 .../java/org/apache/hadoop/ozone/OzoneConsts.java  |   6 +-
 .../commandhandler/DeleteBlocksCommandHandler.java | 191 ++++++--------
 .../background/BlockDeletingService.java           | 171 ++-----------
 .../metadata/AbstractDatanodeDBDefinition.java     |   2 +-
 .../metadata/DatanodeSchemaOneDBDefinition.java    |   5 -
 .../metadata/DatanodeSchemaTwoDBDefinition.java    |  32 +--
 .../metadata/DatanodeStoreSchemaTwoImpl.java       |  14 +-
 .../metadata/DeletedBlocksTransactionCodec.java    |  46 ----
 .../container/common/TestBlockDeletingService.java | 279 ++++-----------------
 .../hadoop/ozone/TestStorageContainerManager.java  |  13 +-
 .../ozone/TestStorageContainerManagerHelper.java   |  30 ---
 11 files changed, 157 insertions(+), 632 deletions(-)

diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
index 9836452..d2d6e35 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
@@ -254,15 +254,11 @@ public final class OzoneConsts {
   // versions, requiring this property to be tracked on a per container basis.
   // V1: All data in default column family.
   public static final String SCHEMA_V1 = "1";
-  // V2: Metadata, block data, and delete transactions in their own
-  // column families.
+  // V2: Metadata, block data, and deleted blocks in their own column families.
   public static final String SCHEMA_V2 = "2";
   // Most recent schema version that all new containers should be created with.
   public static final String SCHEMA_LATEST = SCHEMA_V2;
 
-  public static final String[] SCHEMA_VERSIONS =
-      new String[] {SCHEMA_V1, SCHEMA_V2};
-
   // Supported store types.
   public static final String OZONE = "ozone";
   public static final String S3 = "s3";
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteBlocksCommandHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteBlocksCommandHandler.java
index 10e6797..91ab4c9 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteBlocksCommandHandler.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteBlocksCommandHandler.java
@@ -42,8 +42,6 @@ import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
 import org.apache.hadoop.ozone.container.common.statemachine
     .SCMConnectionManager;
 import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
-import org.apache.hadoop.ozone.container.metadata.DatanodeStore;
-import org.apache.hadoop.ozone.container.metadata.DatanodeStoreSchemaTwoImpl;
 import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
 import org.apache.hadoop.ozone.protocol.commands.CommandStatus;
 import org.apache.hadoop.ozone.protocol.commands.DeleteBlockCommandStatus;
@@ -61,8 +59,6 @@ import java.util.function.Consumer;
 
 import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
     .Result.CONTAINER_NOT_FOUND;
-import static org.apache.hadoop.ozone.OzoneConsts.SCHEMA_V1;
-import static org.apache.hadoop.ozone.OzoneConsts.SCHEMA_V2;
 
 /**
  * Handle block deletion commands.
@@ -120,7 +116,6 @@ public class DeleteBlocksCommandHandler implements CommandHandler {
             DeleteBlockTransactionResult.newBuilder();
         txResultBuilder.setTxID(entry.getTxID());
         long containerId = entry.getContainerID();
-        int newDeletionBlocks = 0;
         try {
           Container cont = containerSet.getContainer(containerId);
           if (cont == null) {
@@ -134,16 +129,7 @@ public class DeleteBlocksCommandHandler implements CommandHandler {
                 cont.getContainerData();
             cont.writeLock();
             try {
-              if (containerData.getSchemaVersion().equals(SCHEMA_V1)) {
-                markBlocksForDeletionSchemaV1(containerData, entry);
-              } else if (containerData.getSchemaVersion().equals(SCHEMA_V2)) {
-                markBlocksForDeletionSchemaV2(containerData, entry,
-                    newDeletionBlocks, entry.getTxID());
-              } else {
-                throw new UnsupportedOperationException(
-                    "Only schema version 1 and schema version 2 are "
-                        + "supported.");
-              }
+              deleteKeyValueContainerBlocks(containerData, entry);
             } finally {
               cont.writeUnlock();
             }
@@ -201,140 +187,107 @@ public class DeleteBlocksCommandHandler implements CommandHandler {
    * @param delTX a block deletion transaction.
    * @throws IOException if I/O error occurs.
    */
-
-  private void markBlocksForDeletionSchemaV2(
-      KeyValueContainerData containerData, DeletedBlocksTransaction delTX,
-      int newDeletionBlocks, long txnID) throws IOException {
-    long containerId = delTX.getContainerID();
-    if (!isTxnIdValid(containerId, containerData, delTX)) {
-      return;
-    }
-    try (ReferenceCountedDB containerDB = BlockUtils
-        .getDB(containerData, conf)) {
-      DatanodeStore ds = containerDB.getStore();
-      DatanodeStoreSchemaTwoImpl dnStoreTwoImpl =
-          (DatanodeStoreSchemaTwoImpl) ds;
-      Table<Long, DeletedBlocksTransaction> delTxTable =
-          dnStoreTwoImpl.getDeleteTransactionTable();
-      try (BatchOperation batch = containerDB.getStore().getBatchHandler()
-          .initBatchOperation()) {
-        delTxTable.putWithBatch(batch, txnID, delTX);
-        newDeletionBlocks += delTX.getLocalIDList().size();
-        updateMetaData(containerData, delTX, newDeletionBlocks, containerDB,
-            batch);
-        containerDB.getStore().getBatchHandler().commitBatchOperation(batch);
-      }
-    }
-  }
-
-  private void markBlocksForDeletionSchemaV1(
+  private void deleteKeyValueContainerBlocks(
       KeyValueContainerData containerData, DeletedBlocksTransaction delTX)
       throws IOException {
     long containerId = delTX.getContainerID();
-    if (!isTxnIdValid(containerId, containerData, delTX)) {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Processing Container : {}, DB path : {}", containerId,
+          containerData.getMetadataPath());
+    }
+
+    if (delTX.getTxID() < containerData.getDeleteTransactionId()) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(String.format("Ignoring delete blocks for containerId: %d."
+                + " Outdated delete transactionId %d < %d", containerId,
+            delTX.getTxID(), containerData.getDeleteTransactionId()));
+      }
       return;
     }
+
     int newDeletionBlocks = 0;
-    try (ReferenceCountedDB containerDB = BlockUtils
-        .getDB(containerData, conf)) {
+    try(ReferenceCountedDB containerDB =
+            BlockUtils.getDB(containerData, conf)) {
       Table<String, BlockData> blockDataTable =
-          containerDB.getStore().getBlockDataTable();
+              containerDB.getStore().getBlockDataTable();
       Table<String, ChunkInfoList> deletedBlocksTable =
-          containerDB.getStore().getDeletedBlocksTable();
+              containerDB.getStore().getDeletedBlocksTable();
 
-      try (BatchOperation batch = containerDB.getStore().getBatchHandler()
-          .initBatchOperation()) {
-        for (Long blkLong : delTX.getLocalIDList()) {
-          String blk = blkLong.toString();
-          BlockData blkInfo = blockDataTable.get(blk);
-          if (blkInfo != null) {
-            String deletingKey = OzoneConsts.DELETING_KEY_PREFIX + blk;
-            if (blockDataTable.get(deletingKey) != null
-                || deletedBlocksTable.get(blk) != null) {
-              if (LOG.isDebugEnabled()) {
-                LOG.debug(String.format(
-                    "Ignoring delete for block %s in container %d."
-                        + " Entry already added.", blk, containerId));
-              }
-              continue;
+      for (Long blkLong : delTX.getLocalIDList()) {
+        String blk = blkLong.toString();
+        BlockData blkInfo = blockDataTable.get(blk);
+        if (blkInfo != null) {
+          String deletingKey = OzoneConsts.DELETING_KEY_PREFIX + blk;
+
+          if (blockDataTable.get(deletingKey) != null
+              || deletedBlocksTable.get(blk) != null) {
+            if (LOG.isDebugEnabled()) {
+              LOG.debug(String.format(
+                  "Ignoring delete for block %s in container %d."
+                      + " Entry already added.", blk, containerId));
             }
+            continue;
+          }
+
+          try(BatchOperation batch = containerDB.getStore()
+              .getBatchHandler().initBatchOperation()) {
             // Found the block in container db,
             // use an atomic update to change its state to deleting.
             blockDataTable.putWithBatch(batch, deletingKey, blkInfo);
             blockDataTable.deleteWithBatch(batch, blk);
+            containerDB.getStore().getBatchHandler()
+                .commitBatchOperation(batch);
             newDeletionBlocks++;
             if (LOG.isDebugEnabled()) {
               LOG.debug("Transited Block {} to DELETING state in container {}",
                   blk, containerId);
             }
-          } else {
-            if (LOG.isDebugEnabled()) {
-              LOG.debug("Block {} not found or already under deletion in"
-                  + " container {}, skip deleting it.", blk, containerId);
-            }
+          } catch (IOException e) {
+            // if some blocks failed to delete, we fail this TX,
+            // without sending this ACK to SCM, SCM will resend the TX
+            // with a certain number of retries.
+            throw new IOException(
+                "Failed to delete blocks for TXID = " + delTX.getTxID(), e);
+          }
+        } else {
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Block {} not found or already under deletion in"
+                + " container {}, skip deleting it.", blk, containerId);
           }
         }
-        updateMetaData(containerData, delTX, newDeletionBlocks, containerDB,
-            batch);
-        containerDB.getStore().getBatchHandler().commitBatchOperation(batch);
-      } catch (IOException e) {
-        // if some blocks failed to delete, we fail this TX,
-        // without sending this ACK to SCM, SCM will resend the TX
-        // with a certain number of retries.
-        throw new IOException(
-            "Failed to delete blocks for TXID = " + delTX.getTxID(), e);
       }
-    }
-  }
 
-  private void updateMetaData(KeyValueContainerData containerData,
-      DeletedBlocksTransaction delTX, int newDeletionBlocks,
-      ReferenceCountedDB containerDB, BatchOperation batchOperation)
-      throws IOException {
-    if (newDeletionBlocks > 0) {
-      // Finally commit the DB counters.
-      Table<String, Long> metadataTable =
-          containerDB.getStore().getMetadataTable();
+      if (newDeletionBlocks > 0) {
+        // Finally commit the DB counters.
+        try(BatchOperation batchOperation =
+                containerDB.getStore().getBatchHandler().initBatchOperation()) {
+          Table< String, Long > metadataTable = containerDB.getStore()
+              .getMetadataTable();
 
-      // In memory is updated only when existing delete transactionID is
-      // greater.
-      if (delTX.getTxID() > containerData.getDeleteTransactionId()) {
-        // Update in DB pending delete key count and delete transaction ID.
-        metadataTable
-            .putWithBatch(batchOperation, OzoneConsts.DELETE_TRANSACTION_KEY,
-                delTX.getTxID());
-      }
+          // In memory is updated only when existing delete transactionID is
+          // greater.
+          if (delTX.getTxID() > containerData.getDeleteTransactionId()) {
+            // Update in DB pending delete key count and delete transaction ID.
+            metadataTable.putWithBatch(batchOperation,
+                OzoneConsts.DELETE_TRANSACTION_KEY, delTX.getTxID());
+          }
 
-      long pendingDeleteBlocks =
-          containerData.getNumPendingDeletionBlocks() + newDeletionBlocks;
-      metadataTable
-          .putWithBatch(batchOperation, OzoneConsts.PENDING_DELETE_BLOCK_COUNT,
-              pendingDeleteBlocks);
+          long pendingDeleteBlocks =
+              containerData.getNumPendingDeletionBlocks() + newDeletionBlocks;
+          metadataTable.putWithBatch(batchOperation,
+              OzoneConsts.PENDING_DELETE_BLOCK_COUNT, pendingDeleteBlocks);
 
-      // update pending deletion blocks count and delete transaction ID in
-      // in-memory container status
-      containerData.updateDeleteTransactionId(delTX.getTxID());
-      containerData.incrPendingDeletionBlocks(newDeletionBlocks);
-    }
-  }
+          containerDB.getStore().getBatchHandler()
+              .commitBatchOperation(batchOperation);
 
-  private boolean isTxnIdValid(long containerId,
-      KeyValueContainerData containerData, DeletedBlocksTransaction delTX) {
-    boolean b = true;
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Processing Container : {}, DB path : {}", containerId,
-          containerData.getMetadataPath());
-    }
+          // update pending deletion blocks count and delete transaction ID in
+          // in-memory container status
+          containerData.updateDeleteTransactionId(delTX.getTxID());
 
-    if (delTX.getTxID() < containerData.getDeleteTransactionId()) {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug(String.format("Ignoring delete blocks for containerId: %d."
-                + " Outdated delete transactionId %d < %d", containerId,
-            delTX.getTxID(), containerData.getDeleteTransactionId()));
+          containerData.incrPendingDeletionBlocks(newDeletionBlocks);
+        }
       }
-      b = false;
     }
-    return b;
   }
 
   @Override
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/statemachine/background/BlockDeletingService.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/statemachine/background/BlockDeletingService.java
index 3dab1fa..b03b7d7 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/statemachine/background/BlockDeletingService.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/statemachine/background/BlockDeletingService.java
@@ -20,12 +20,11 @@ package org.apache.hadoop.ozone.container.keyvalue.statemachine.background;
 
 import java.io.File;
 import java.io.IOException;
-import java.util.UUID;
 import java.util.LinkedList;
-import java.util.Objects;
-import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
+import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
@@ -33,15 +32,14 @@ import org.apache.hadoop.hdds.conf.ConfigurationSource;
 import org.apache.hadoop.hdds.scm.ScmConfigKeys;
 import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
+import org.apache.hadoop.hdds.utils.BackgroundService;
+import org.apache.hadoop.hdds.utils.BackgroundTask;
+import org.apache.hadoop.hdds.utils.BackgroundTaskQueue;
 import org.apache.hadoop.hdds.utils.BackgroundTaskResult;
 import org.apache.hadoop.hdds.utils.db.BatchOperation;
 import org.apache.hadoop.hdds.utils.MetadataKeyFilters;
-import org.apache.hadoop.hdds.utils.BackgroundTaskQueue;
-import org.apache.hadoop.hdds.utils.BackgroundService;
-import org.apache.hadoop.hdds.utils.BackgroundTask;
 import org.apache.hadoop.hdds.utils.MetadataKeyFilters.KeyPrefixFilter;
 import org.apache.hadoop.hdds.utils.db.Table;
-import org.apache.hadoop.hdds.utils.db.TableIterator;
 import org.apache.hadoop.ozone.container.common.helpers.BlockData;
 import org.apache.hadoop.ozone.container.common.impl.ContainerData;
 import org.apache.hadoop.ozone.container.common.impl.TopNOrderedContainerDeletionChoosingPolicy;
@@ -52,22 +50,14 @@ import org.apache.hadoop.ozone.container.common.transport.server.ratis.XceiverSe
 import org.apache.hadoop.ozone.container.common.utils.ReferenceCountedDB;
 import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
 import org.apache.hadoop.ozone.container.keyvalue.helpers.BlockUtils;
-import org.apache.hadoop.ozone.container.metadata.DatanodeStore;
-import org.apache.hadoop.ozone.container.metadata.DatanodeStoreSchemaTwoImpl;
 import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
 import org.apache.hadoop.util.Time;
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction;
 
 import com.google.common.collect.Lists;
-
 import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_CONTAINER_LIMIT_PER_INTERVAL;
 import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_CONTAINER_LIMIT_PER_INTERVAL_DEFAULT;
 import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_LIMIT_PER_CONTAINER;
 import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_LIMIT_PER_CONTAINER_DEFAULT;
-import static org.apache.hadoop.ozone.OzoneConsts.SCHEMA_V1;
-import static org.apache.hadoop.ozone.OzoneConsts.SCHEMA_V2;
-
 import org.apache.ratis.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -76,7 +66,7 @@ import org.slf4j.LoggerFactory;
  * A per-datanode container block deleting service takes in charge
  * of deleting staled ozone blocks.
  */
-
+// TODO: Fix BlockDeletingService to work with new StorageLayer
 public class BlockDeletingService extends BackgroundService {
 
   private static final Logger LOG =
@@ -254,54 +244,21 @@ public class BlockDeletingService extends BackgroundService {
 
     @Override
     public BackgroundTaskResult call() throws Exception {
-      ContainerBackgroundTaskResult crr;
+      ContainerBackgroundTaskResult crr = new ContainerBackgroundTaskResult();
       final Container container = ozoneContainer.getContainerSet()
           .getContainer(containerData.getContainerID());
       container.writeLock();
-      File dataDir = new File(containerData.getChunksPath());
       long startTime = Time.monotonicNow();
       // Scan container's db and get list of under deletion blocks
       try (ReferenceCountedDB meta = BlockUtils.getDB(containerData, conf)) {
-        if (containerData.getSchemaVersion().equals(SCHEMA_V1)) {
-          crr = deleteViaSchema1(meta, container, dataDir, startTime);
-        } else if (containerData.getSchemaVersion().equals(SCHEMA_V2)) {
-          crr = deleteViaSchema2(meta, container, dataDir, startTime);
-        } else {
-          throw new UnsupportedOperationException(
-              "Only schema version 1 and schema version 2 are supported.");
-        }
-        return crr;
-      } finally {
-        container.writeUnlock();
-      }
-    }
-
-    public boolean checkDataDir(File dataDir) {
-      boolean b = true;
-      if (!dataDir.exists() || !dataDir.isDirectory()) {
-        LOG.error("Invalid container data dir {} : "
-            + "does not exist or not a directory", dataDir.getAbsolutePath());
-        b = false;
-      }
-      return b;
-    }
-
-    public ContainerBackgroundTaskResult deleteViaSchema1(
-        ReferenceCountedDB meta, Container container, File dataDir,
-        long startTime) throws IOException {
-      ContainerBackgroundTaskResult crr = new ContainerBackgroundTaskResult();
-      if (!checkDataDir(dataDir)) {
-        return crr;
-      }
-      try {
         Table<String, BlockData> blockDataTable =
-            meta.getStore().getBlockDataTable();
+                meta.getStore().getBlockDataTable();
 
         // # of blocks to delete is throttled
         KeyPrefixFilter filter = MetadataKeyFilters.getDeletingKeyFilter();
         List<? extends Table.KeyValue<String, BlockData>> toDeleteBlocks =
             blockDataTable.getSequentialRangeKVs(null, blockLimitPerTask,
-                filter);
+                    filter);
         if (toDeleteBlocks.isEmpty()) {
           LOG.debug("No under deletion block found in container : {}",
               containerData.getContainerID());
@@ -310,6 +267,12 @@ public class BlockDeletingService extends BackgroundService {
         List<String> succeedBlocks = new LinkedList<>();
         LOG.debug("Container : {}, To-Delete blocks : {}",
             containerData.getContainerID(), toDeleteBlocks.size());
+        File dataDir = new File(containerData.getChunksPath());
+        if (!dataDir.exists() || !dataDir.isDirectory()) {
+          LOG.error("Invalid container data dir {} : "
+              + "does not exist or not a directory", dataDir.getAbsolutePath());
+          return crr;
+        }
 
         Handler handler = Objects.requireNonNull(ozoneContainer.getDispatcher()
             .getHandler(container.getContainerType()));
@@ -329,7 +292,7 @@ public class BlockDeletingService extends BackgroundService {
 
         // Once blocks are deleted... remove the blockID from blockDataTable.
         try(BatchOperation batch = meta.getStore().getBatchHandler()
-            .initBatchOperation()) {
+                .initBatchOperation()) {
           for (String entry : succeedBlocks) {
             blockDataTable.deleteWithBatch(batch, entry);
           }
@@ -349,106 +312,8 @@ public class BlockDeletingService extends BackgroundService {
         }
         crr.addAll(succeedBlocks);
         return crr;
-      } catch (IOException exception) {
-        LOG.warn(
-            "Deletion operation was not successful for container: " + container
-                .getContainerData().getContainerID(), exception);
-        throw exception;
-      }
-    }
-
-    public ContainerBackgroundTaskResult deleteViaSchema2(
-        ReferenceCountedDB meta, Container container, File dataDir,
-        long startTime) throws IOException {
-      ContainerBackgroundTaskResult crr = new ContainerBackgroundTaskResult();
-      if (!checkDataDir(dataDir)) {
-        return crr;
-      }
-      try {
-        Table<String, BlockData> blockDataTable =
-            meta.getStore().getBlockDataTable();
-        DatanodeStore ds = meta.getStore();
-        DatanodeStoreSchemaTwoImpl dnStoreTwoImpl =
-            (DatanodeStoreSchemaTwoImpl) ds;
-        Table<Long, DeletedBlocksTransaction>
-            deleteTxns = dnStoreTwoImpl.getDeleteTransactionTable();
-        List<DeletedBlocksTransaction> delBlocks = new ArrayList<>();
-        int totalBlocks = 0;
-        try (TableIterator<Long,
-            ? extends Table.KeyValue<Long, DeletedBlocksTransaction>> iter =
-            dnStoreTwoImpl.getDeleteTransactionTable().iterator()) {
-          while (iter.hasNext() && (totalBlocks < blockLimitPerTask)) {
-            DeletedBlocksTransaction delTx = iter.next().getValue();
-            totalBlocks += delTx.getLocalIDList().size();
-            delBlocks.add(delTx);
-          }
-        }
-
-        if (delBlocks.isEmpty()) {
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("No transaction found in container : {}",
-                containerData.getContainerID());
-          }
-          return crr;
-        }
-
-        LOG.debug("Container : {}, To-Delete blocks : {}",
-            containerData.getContainerID(), delBlocks.size());
-
-        Handler handler = Objects.requireNonNull(ozoneContainer.getDispatcher()
-            .getHandler(container.getContainerType()));
-
-        deleteTransactions(delBlocks, handler, blockDataTable, container);
-
-        // Once blocks are deleted... remove the blockID from blockDataTable
-        // and also remove the transactions from txnTable.
-        try(BatchOperation batch = meta.getStore().getBatchHandler()
-            .initBatchOperation()) {
-          for (DeletedBlocksTransaction delTx : delBlocks) {
-            deleteTxns.deleteWithBatch(batch, delTx.getTxID());
-            for (Long blk : delTx.getLocalIDList()) {
-              String bID = blk.toString();
-              meta.getStore().getBlockDataTable().deleteWithBatch(batch, bID);
-            }
-          }
-          meta.getStore().getBatchHandler().commitBatchOperation(batch);
-          containerData.updateAndCommitDBCounters(meta, batch,
-              totalBlocks);
-          // update count of pending deletion blocks and block count in
-          // in-memory container status.
-          containerData.decrPendingDeletionBlocks(totalBlocks);
-          containerData.decrKeyCount(totalBlocks);
-        }
-
-        LOG.info("Container: {}, deleted blocks: {}, task elapsed time: {}ms",
-            containerData.getContainerID(), totalBlocks,
-            Time.monotonicNow() - startTime);
-
-        return crr;
-      } catch (IOException exception) {
-        LOG.warn(
-            "Deletion operation was not successful for container: " + container
-                .getContainerData().getContainerID(), exception);
-        throw exception;
-      }
-    }
-
-    private void deleteTransactions(List<DeletedBlocksTransaction> delBlocks,
-        Handler handler, Table<String, BlockData> blockDataTable,
-        Container container) throws IOException {
-      for (DeletedBlocksTransaction entry : delBlocks) {
-        for (Long blkLong : entry.getLocalIDList()) {
-          String blk = blkLong.toString();
-          BlockData blkInfo = blockDataTable.get(blk);
-          LOG.debug("Deleting block {}", blk);
-          try {
-            handler.deleteBlock(container, blkInfo);
-          } catch (InvalidProtocolBufferException e) {
-            LOG.error("Failed to parse block info for block {}", blk, e);
-          } catch (IOException e) {
-            LOG.error("Failed to delete files for block {}", blk, e);
-          }
-        }
+      } finally {
+        container.writeUnlock();
       }
     }
 
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/AbstractDatanodeDBDefinition.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/AbstractDatanodeDBDefinition.java
index 2fb1174..8895475 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/AbstractDatanodeDBDefinition.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/AbstractDatanodeDBDefinition.java
@@ -60,7 +60,7 @@ public abstract class AbstractDatanodeDBDefinition implements DBDefinition {
   @Override
   public DBColumnFamilyDefinition[] getColumnFamilies() {
     return new DBColumnFamilyDefinition[] {getBlockDataColumnFamily(),
-        getMetadataColumnFamily(), getDeletedBlocksColumnFamily()};
+            getMetadataColumnFamily(), getDeletedBlocksColumnFamily()};
   }
 
   public abstract DBColumnFamilyDefinition<String, BlockData>
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeSchemaOneDBDefinition.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeSchemaOneDBDefinition.java
index 7d5e053..faf399d 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeSchemaOneDBDefinition.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeSchemaOneDBDefinition.java
@@ -88,9 +88,4 @@ public class DatanodeSchemaOneDBDefinition
       getDeletedBlocksColumnFamily() {
     return DELETED_BLOCKS;
   }
-
-  public DBColumnFamilyDefinition[] getColumnFamilies() {
-    return new DBColumnFamilyDefinition[] {getBlockDataColumnFamily(),
-        getMetadataColumnFamily(), getDeletedBlocksColumnFamily() };
-  }
 }
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeSchemaTwoDBDefinition.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeSchemaTwoDBDefinition.java
index 1fabd13..2ac56f2 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeSchemaTwoDBDefinition.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeSchemaTwoDBDefinition.java
@@ -17,19 +17,16 @@
  */
 package org.apache.hadoop.ozone.container.metadata;
 
-import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos;
 import org.apache.hadoop.hdds.utils.db.DBColumnFamilyDefinition;
 import org.apache.hadoop.hdds.utils.db.LongCodec;
 import org.apache.hadoop.hdds.utils.db.StringCodec;
 import org.apache.hadoop.ozone.container.common.helpers.BlockData;
 import org.apache.hadoop.ozone.container.common.helpers.ChunkInfoList;
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction;
 
 /**
  * This class defines the RocksDB structure for datanodes following schema
- * version 2, where the block data, metadata, and transactions which are to be
- * deleted are put in their own separate column families.
+ * version 2, where the block data, metadata, and deleted block ids are put in
+ * their own separate column families.
  */
 public class DatanodeSchemaTwoDBDefinition extends
         AbstractDatanodeDBDefinition {
@@ -37,7 +34,7 @@ public class DatanodeSchemaTwoDBDefinition extends
   public static final DBColumnFamilyDefinition<String, BlockData>
           BLOCK_DATA =
           new DBColumnFamilyDefinition<>(
-                  "blockData",
+                  "block_data",
                   String.class,
                   new StringCodec(),
                   BlockData.class,
@@ -55,33 +52,17 @@ public class DatanodeSchemaTwoDBDefinition extends
   public static final DBColumnFamilyDefinition<String, ChunkInfoList>
           DELETED_BLOCKS =
           new DBColumnFamilyDefinition<>(
-                  "deletedBlocks",
+                  "deleted_blocks",
                   String.class,
                   new StringCodec(),
                   ChunkInfoList.class,
                   new ChunkInfoListCodec());
 
-  public static final DBColumnFamilyDefinition<Long, DeletedBlocksTransaction>
-      DELETE_TRANSACTION =
-      new DBColumnFamilyDefinition<>(
-          "deleteTxns",
-          Long.class,
-          new LongCodec(),
-          StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction.class,
-          new DeletedBlocksTransactionCodec());
-
   protected DatanodeSchemaTwoDBDefinition(String dbPath) {
     super(dbPath);
   }
 
   @Override
-  public DBColumnFamilyDefinition[] getColumnFamilies() {
-    return new DBColumnFamilyDefinition[] {getBlockDataColumnFamily(),
-        getMetadataColumnFamily(), getDeletedBlocksColumnFamily(),
-        getDeleteTransactionsColumnFamily()};
-  }
-
-  @Override
   public DBColumnFamilyDefinition<String, BlockData>
       getBlockDataColumnFamily() {
     return BLOCK_DATA;
@@ -97,9 +78,4 @@ public class DatanodeSchemaTwoDBDefinition extends
       getDeletedBlocksColumnFamily() {
     return DELETED_BLOCKS;
   }
-
-  public DBColumnFamilyDefinition<Long, DeletedBlocksTransaction>
-      getDeleteTransactionsColumnFamily() {
-    return DELETE_TRANSACTION;
-  }
 }
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeStoreSchemaTwoImpl.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeStoreSchemaTwoImpl.java
index db8fe6b..df9b8c0 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeStoreSchemaTwoImpl.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeStoreSchemaTwoImpl.java
@@ -18,9 +18,6 @@
 package org.apache.hadoop.ozone.container.metadata;
 
 import org.apache.hadoop.hdds.conf.ConfigurationSource;
-import org.apache.hadoop.hdds.protocol.proto.
-    StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction;
-import org.apache.hadoop.hdds.utils.db.Table;
 
 import java.io.IOException;
 
@@ -29,13 +26,10 @@ import java.io.IOException;
  * three column families/tables:
  * 1. A block data table.
  * 2. A metadata table.
- * 3. A Delete Transaction Table.
+ * 3. A deleted blocks table.
  */
 public class DatanodeStoreSchemaTwoImpl extends AbstractDatanodeStore {
 
-  private final Table<Long, DeletedBlocksTransaction>
-      deleteTransactionTable;
-
   /**
    * Constructs the datanode store and starts the DB Services.
    *
@@ -47,11 +41,5 @@ public class DatanodeStoreSchemaTwoImpl extends AbstractDatanodeStore {
       throws IOException {
     super(config, containerID, new DatanodeSchemaTwoDBDefinition(dbPath),
         openReadOnly);
-    this.deleteTransactionTable = new DatanodeSchemaTwoDBDefinition(dbPath)
-        .getDeleteTransactionsColumnFamily().getTable(getStore());
-  }
-
-  public Table<Long, DeletedBlocksTransaction> getDeleteTransactionTable() {
-    return deleteTransactionTable;
   }
 }
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DeletedBlocksTransactionCodec.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DeletedBlocksTransactionCodec.java
deleted file mode 100644
index 90c26fe..0000000
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DeletedBlocksTransactionCodec.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.ozone.container.metadata;
-
-import org.apache.hadoop.hdds.utils.db.Codec;
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction;
-
-import java.io.IOException;
-
-/**
- * Supports encoding and decoding {@link DeletedBlocksTransaction} objects.
- */
-public class DeletedBlocksTransactionCodec
-    implements Codec<DeletedBlocksTransaction> {
-
-  @Override public byte[] toPersistedFormat(
-      DeletedBlocksTransaction deletedBlocksTransaction) {
-    return deletedBlocksTransaction.toByteArray();
-  }
-
-  @Override public DeletedBlocksTransaction fromPersistedFormat(byte[] rawData)
-      throws IOException {
-    return DeletedBlocksTransaction.parseFrom(rawData);
-  }
-
-  @Override public DeletedBlocksTransaction copyObject(
-      DeletedBlocksTransaction deletedBlocksTransaction) {
-    throw new UnsupportedOperationException();
-  }
-}
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/TestBlockDeletingService.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/TestBlockDeletingService.java
index 96d4228..2eb6a39 100644
--- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/TestBlockDeletingService.java
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/TestBlockDeletingService.java
@@ -21,10 +21,9 @@ package org.apache.hadoop.ozone.container.common;
 import java.io.File;
 import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.util.UUID;
-import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
+import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
@@ -35,13 +34,9 @@ import org.apache.hadoop.hdds.conf.ConfigurationSource;
 import org.apache.hadoop.hdds.conf.MutableConfigurationSource;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
-import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos;
 import org.apache.hadoop.hdds.scm.ScmConfigKeys;
 import org.apache.hadoop.hdds.utils.BackgroundService;
 import org.apache.hadoop.hdds.utils.MetadataKeyFilters;
-import org.apache.hadoop.hdds.utils.db.BatchOperation;
-import org.apache.hadoop.hdds.utils.db.Table;
-import org.apache.hadoop.hdds.utils.db.TableIterator;
 import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.ozone.common.Checksum;
 import org.apache.hadoop.ozone.common.ChunkBuffer;
@@ -60,6 +55,7 @@ import org.apache.hadoop.ozone.container.common.utils.ReferenceCountedDB;
 import org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet;
 import org.apache.hadoop.ozone.container.common.volume.RoundRobinVolumeChoosingPolicy;
 import org.apache.hadoop.ozone.container.common.volume.VolumeSet;
+import org.apache.hadoop.ozone.container.keyvalue.ChunkLayoutTestInfo;
 import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainer;
 import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
 import org.apache.hadoop.ozone.container.keyvalue.KeyValueHandler;
@@ -68,14 +64,11 @@ import org.apache.hadoop.ozone.container.keyvalue.impl.FilePerBlockStrategy;
 import org.apache.hadoop.ozone.container.keyvalue.impl.FilePerChunkStrategy;
 import org.apache.hadoop.ozone.container.keyvalue.interfaces.ChunkManager;
 import org.apache.hadoop.ozone.container.keyvalue.statemachine.background.BlockDeletingService;
-import org.apache.hadoop.ozone.container.metadata.DatanodeStore;
-import org.apache.hadoop.ozone.container.metadata.DatanodeStoreSchemaTwoImpl;
 import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
 import org.apache.hadoop.ozone.container.testutils.BlockDeletingServiceTestImpl;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.hadoop.test.GenericTestUtils.LogCapturer;
 
-import static java.util.stream.Collectors.toList;
 import static org.apache.commons.lang3.RandomStringUtils.randomAlphanumeric;
 
 import org.junit.AfterClass;
@@ -89,11 +82,7 @@ import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_CONTA
 import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_LIMIT_PER_CONTAINER;
 import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_LIMIT_PER_CONTAINER_DEFAULT;
 import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_INTERVAL;
-import static org.apache.hadoop.ozone.OzoneConsts.SCHEMA_VERSIONS;
-import static org.apache.hadoop.ozone.OzoneConsts.SCHEMA_V1;
-import static org.apache.hadoop.ozone.OzoneConsts.SCHEMA_V2;
 import static org.apache.hadoop.ozone.container.common.impl.ChunkLayOutVersion.FILE_PER_BLOCK;
-import static org.apache.hadoop.ozone.container.common.states.endpoint.VersionEndpointTask.LOG;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
@@ -112,38 +101,16 @@ public class TestBlockDeletingService {
   private static MutableConfigurationSource conf;
 
   private final ChunkLayOutVersion layout;
-  private final String schemaVersion;
   private int blockLimitPerTask;
   private static VolumeSet volumeSet;
 
-  public TestBlockDeletingService(LayoutInfo layoutInfo) {
-    this.layout = layoutInfo.layout;
-    this.schemaVersion = layoutInfo.schemaVersion;
+  public TestBlockDeletingService(ChunkLayOutVersion layout) {
+    this.layout = layout;
   }
 
   @Parameterized.Parameters
   public static Iterable<Object[]> parameters() {
-    return LayoutInfo.layoutList.stream().map(each -> new Object[] {each})
-        .collect(toList());
-  }
-
-  public static class LayoutInfo {
-    private final String schemaVersion;
-    private final ChunkLayOutVersion layout;
-
-    public LayoutInfo(String schemaVersion, ChunkLayOutVersion layout) {
-      this.schemaVersion = schemaVersion;
-      this.layout = layout;
-    }
-
-    private static List<LayoutInfo> layoutList = new ArrayList<>();
-    static {
-      for (ChunkLayOutVersion ch : ChunkLayOutVersion.getAllVersions()) {
-        for (String sch : SCHEMA_VERSIONS) {
-          layoutList.add(new LayoutInfo(sch, ch));
-        }
-      }
-    }
+    return ChunkLayoutTestInfo.chunkLayoutParameters();
   }
 
   @BeforeClass
@@ -191,7 +158,6 @@ public class TestBlockDeletingService {
     }
     byte[] arr = randomAlphanumeric(1048576).getBytes(UTF_8);
     ChunkBuffer buffer = ChunkBuffer.wrap(ByteBuffer.wrap(arr));
-    int txnID = 0;
     for (int x = 0; x < numOfContainers; x++) {
       long containerID = ContainerTestHelper.getTestContainerID();
       KeyValueContainerData data =
@@ -199,164 +165,55 @@ public class TestBlockDeletingService {
               ContainerTestHelper.CONTAINER_MAX_SIZE,
               UUID.randomUUID().toString(), datanodeUuid);
       data.closeContainer();
-      data.setSchemaVersion(schemaVersion);
       KeyValueContainer container = new KeyValueContainer(data, conf);
       container.create(volumeSet,
           new RoundRobinVolumeChoosingPolicy(), scmId);
       containerSet.addContainer(container);
       data = (KeyValueContainerData) containerSet.getContainer(
           containerID).getContainerData();
-      if (data.getSchemaVersion().equals(SCHEMA_V1)) {
-        createPendingDeleteBlocksSchema1(numOfBlocksPerContainer, data,
-            containerID, numOfChunksPerBlock, buffer, chunkManager, container);
-      } else if (data.getSchemaVersion().equals(SCHEMA_V2)) {
-        createPendingDeleteBlocksSchema2(numOfBlocksPerContainer, txnID,
-            containerID, numOfChunksPerBlock, buffer, chunkManager, container,
-            data);
-      } else {
-        throw new UnsupportedOperationException(
-            "Only schema version 1 and schema version 2 are "
-                + "supported.");
-      }
-    }
-  }
-
-  @SuppressWarnings("checkstyle:parameternumber")
-  private void createPendingDeleteBlocksSchema1(int numOfBlocksPerContainer,
-      KeyValueContainerData data, long containerID, int numOfChunksPerBlock,
-      ChunkBuffer buffer, ChunkManager chunkManager,
-      KeyValueContainer container) {
-    BlockID blockID = null;
-    try (ReferenceCountedDB metadata = BlockUtils.getDB(data, conf)) {
-      for (int j = 0; j < numOfBlocksPerContainer; j++) {
-        blockID = ContainerTestHelper.getTestBlockID(containerID);
-        String deleteStateName =
-            OzoneConsts.DELETING_KEY_PREFIX + blockID.getLocalID();
-        BlockData kd = new BlockData(blockID);
-        List<ContainerProtos.ChunkInfo> chunks = Lists.newArrayList();
-        putChunksInBlock(numOfChunksPerBlock, j, chunks, buffer, chunkManager,
-            container, blockID);
-        kd.setChunks(chunks);
-        metadata.getStore().getBlockDataTable().put(deleteStateName, kd);
-        container.getContainerData().incrPendingDeletionBlocks(1);
-      }
-      updateMetaData(data, container, numOfBlocksPerContainer,
-          numOfChunksPerBlock);
-    } catch (IOException exception) {
-      LOG.info("Exception " + exception);
-      LOG.warn("Failed to put block: " + blockID + " in BlockDataTable.");
-    }
-  }
-
-  @SuppressWarnings("checkstyle:parameternumber")
-  private void createPendingDeleteBlocksSchema2(int numOfBlocksPerContainer,
-      int txnID, long containerID, int numOfChunksPerBlock, ChunkBuffer buffer,
-      ChunkManager chunkManager, KeyValueContainer container,
-      KeyValueContainerData data) {
-    List<Long> containerBlocks = new ArrayList<>();
-    int blockCount = 0;
-    for (int i = 0; i < numOfBlocksPerContainer; i++) {
-      txnID = txnID + 1;
-      BlockID blockID = ContainerTestHelper.getTestBlockID(containerID);
-      BlockData kd = new BlockData(blockID);
-      List<ContainerProtos.ChunkInfo> chunks = Lists.newArrayList();
-      putChunksInBlock(numOfChunksPerBlock, i, chunks, buffer, chunkManager,
-          container, blockID);
-      kd.setChunks(chunks);
-      String bID = null;
-      try (ReferenceCountedDB metadata = BlockUtils.getDB(data, conf)) {
-        bID = blockID.getLocalID() + "";
-        metadata.getStore().getBlockDataTable().put(bID, kd);
-      } catch (IOException exception) {
-        LOG.info("Exception = " + exception);
-        LOG.warn("Failed to put block: " + bID + " in BlockDataTable.");
-      }
-      container.getContainerData().incrPendingDeletionBlocks(1);
-
-      // In below if statements we are checking if a single container
-      // consists of more blocks than 'blockLimitPerTask' then we create
-      // (totalBlocksInContainer / blockLimitPerTask) transactions which
-      // consists of blocks equal to blockLimitPerTask and last transaction
-      // consists of blocks equal to
-      // (totalBlocksInContainer % blockLimitPerTask).
-      containerBlocks.add(blockID.getLocalID());
-      blockCount++;
-      if (blockCount == blockLimitPerTask || i == (numOfBlocksPerContainer
-          - 1)) {
-        createTxn(data, containerBlocks, txnID, containerID);
-        containerBlocks.clear();
-        blockCount = 0;
-      }
-    }
-    updateMetaData(data, container, numOfBlocksPerContainer,
-        numOfChunksPerBlock);
-  }
-
-  private void createTxn(KeyValueContainerData data, List<Long> containerBlocks,
-      int txnID, long containerID) {
-    try (ReferenceCountedDB metadata = BlockUtils.getDB(data, conf)) {
-      StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction dtx =
-          StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction
-              .newBuilder().setTxID(txnID).setContainerID(containerID)
-              .addAllLocalID(containerBlocks).setCount(0).build();
-      try (BatchOperation batch = metadata.getStore().getBatchHandler()
-          .initBatchOperation()) {
-        DatanodeStore ds = metadata.getStore();
-        DatanodeStoreSchemaTwoImpl dnStoreTwoImpl =
-            (DatanodeStoreSchemaTwoImpl) ds;
-        dnStoreTwoImpl.getDeleteTransactionTable()
-            .putWithBatch(batch, (long) txnID, dtx);
-        metadata.getStore().getBatchHandler().commitBatchOperation(batch);
-      }
-    } catch (IOException exception) {
-      LOG.warn("Transaction creation was not successful for txnID: " + txnID
-          + " consisting of " + containerBlocks.size() + " blocks.");
-    }
-  }
-
-  private void putChunksInBlock(int numOfChunksPerBlock, int i,
-      List<ContainerProtos.ChunkInfo> chunks, ChunkBuffer buffer,
-      ChunkManager chunkManager, KeyValueContainer container, BlockID blockID) {
-    long chunkLength = 100;
-    try {
-      for (int k = 0; k < numOfChunksPerBlock; k++) {
-        final String chunkName = String.format("block.%d.chunk.%d", i, k);
-        final long offset = k * chunkLength;
-        ContainerProtos.ChunkInfo info =
-            ContainerProtos.ChunkInfo.newBuilder().setChunkName(chunkName)
-                .setLen(chunkLength).setOffset(offset)
-                .setChecksumData(Checksum.getNoChecksumDataProto()).build();
-        chunks.add(info);
-        ChunkInfo chunkInfo = new ChunkInfo(chunkName, offset, chunkLength);
-        ChunkBuffer chunkData = buffer.duplicate(0, (int) chunkLength);
-        chunkManager
-            .writeChunk(container, blockID, chunkInfo, chunkData, WRITE_STAGE);
-        chunkManager
-            .writeChunk(container, blockID, chunkInfo, chunkData, COMMIT_STAGE);
+      long chunkLength = 100;
+      try(ReferenceCountedDB metadata = BlockUtils.getDB(data, conf)) {
+        for (int j = 0; j < numOfBlocksPerContainer; j++) {
+          BlockID blockID =
+              ContainerTestHelper.getTestBlockID(containerID);
+          String deleteStateName = OzoneConsts.DELETING_KEY_PREFIX +
+              blockID.getLocalID();
+          BlockData kd = new BlockData(blockID);
+          List<ContainerProtos.ChunkInfo> chunks = Lists.newArrayList();
+          for (int k = 0; k < numOfChunksPerBlock; k++) {
+            final String chunkName = String.format("block.%d.chunk.%d", j, k);
+            final long offset = k * chunkLength;
+            ContainerProtos.ChunkInfo info =
+                ContainerProtos.ChunkInfo.newBuilder()
+                    .setChunkName(chunkName)
+                    .setLen(chunkLength)
+                    .setOffset(offset)
+                    .setChecksumData(Checksum.getNoChecksumDataProto())
+                    .build();
+            chunks.add(info);
+            ChunkInfo chunkInfo = new ChunkInfo(chunkName, offset, chunkLength);
+            ChunkBuffer chunkData = buffer.duplicate(0, (int) chunkLength);
+            chunkManager.writeChunk(container, blockID, chunkInfo, chunkData,
+                WRITE_STAGE);
+            chunkManager.writeChunk(container, blockID, chunkInfo, chunkData,
+                COMMIT_STAGE);
+          }
+          kd.setChunks(chunks);
+          metadata.getStore().getBlockDataTable().put(
+                  deleteStateName, kd);
+          container.getContainerData().incrPendingDeletionBlocks(1);
+        }
+        container.getContainerData().setKeyCount(numOfBlocksPerContainer);
+        // Set block count, bytes used and pending delete block count.
+        metadata.getStore().getMetadataTable().put(
+                OzoneConsts.BLOCK_COUNT, (long)numOfBlocksPerContainer);
+        metadata.getStore().getMetadataTable().put(
+                OzoneConsts.CONTAINER_BYTES_USED,
+            chunkLength * numOfChunksPerBlock * numOfBlocksPerContainer);
+        metadata.getStore().getMetadataTable().put(
+                OzoneConsts.PENDING_DELETE_BLOCK_COUNT,
+                (long)numOfBlocksPerContainer);
       }
-    } catch (IOException ex) {
-      LOG.warn("Putting chunks in blocks was not successful for BlockID: "
-          + blockID);
-    }
-  }
-
-  private void updateMetaData(KeyValueContainerData data,
-      KeyValueContainer container, int numOfBlocksPerContainer,
-      int numOfChunksPerBlock) {
-    long chunkLength = 100;
-    try (ReferenceCountedDB metadata = BlockUtils.getDB(data, conf)) {
-      container.getContainerData().setKeyCount(numOfBlocksPerContainer);
-      // Set block count, bytes used and pending delete block count.
-      metadata.getStore().getMetadataTable()
-          .put(OzoneConsts.BLOCK_COUNT, (long) numOfBlocksPerContainer);
-      metadata.getStore().getMetadataTable()
-          .put(OzoneConsts.CONTAINER_BYTES_USED,
-              chunkLength * numOfChunksPerBlock * numOfBlocksPerContainer);
-      metadata.getStore().getMetadataTable()
-          .put(OzoneConsts.PENDING_DELETE_BLOCK_COUNT,
-              (long) numOfBlocksPerContainer);
-    } catch (IOException exception) {
-      LOG.warn("Meta Data update was not successful for container: "+container);
     }
   }
 
@@ -374,32 +231,11 @@ public class TestBlockDeletingService {
    * Get under deletion blocks count from DB,
    * note this info is parsed from container.db.
    */
-  private int getUnderDeletionBlocksCount(ReferenceCountedDB meta,
-      KeyValueContainerData data) throws IOException {
-    if (data.getSchemaVersion().equals(SCHEMA_V1)) {
-      return meta.getStore().getBlockDataTable()
-          .getRangeKVs(null, 100, MetadataKeyFilters.getDeletingKeyFilter())
-          .size();
-    } else if (data.getSchemaVersion().equals(SCHEMA_V2)) {
-      int pendingBlocks = 0;
-      DatanodeStore ds = meta.getStore();
-      DatanodeStoreSchemaTwoImpl dnStoreTwoImpl =
-          (DatanodeStoreSchemaTwoImpl) ds;
-      try (
-          TableIterator<Long, ? extends Table.KeyValue<Long, 
-              StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction>> 
-              iter = dnStoreTwoImpl.getDeleteTransactionTable().iterator()) {
-        while (iter.hasNext()) {
-          StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction
-              delTx = iter.next().getValue();
-          pendingBlocks += delTx.getLocalIDList().size();
-        }
-      }
-      return pendingBlocks;
-    } else {
-      throw new UnsupportedOperationException(
-          "Only schema version 1 and schema version 2 are supported.");
-    }
+  private int getUnderDeletionBlocksCount(ReferenceCountedDB meta)
+      throws IOException {
+    return meta.getStore().getBlockDataTable()
+        .getRangeKVs(null, 100,
+        MetadataKeyFilters.getDeletingKeyFilter()).size();
   }
 
 
@@ -425,7 +261,6 @@ public class TestBlockDeletingService {
     // Ensure 1 container was created
     List<ContainerData> containerData = Lists.newArrayList();
     containerSet.listContainer(0L, 1, containerData);
-    KeyValueContainerData data = (KeyValueContainerData) containerData.get(0);
     Assert.assertEquals(1, containerData.size());
 
     try(ReferenceCountedDB meta = BlockUtils.getDB(
@@ -445,7 +280,7 @@ public class TestBlockDeletingService {
       Assert.assertEquals(0, transactionId);
 
       // Ensure there are 3 blocks under deletion and 0 deleted blocks
-      Assert.assertEquals(3, getUnderDeletionBlocksCount(meta, data));
+      Assert.assertEquals(3, getUnderDeletionBlocksCount(meta));
       Assert.assertEquals(3, meta.getStore().getMetadataTable()
           .get(OzoneConsts.PENDING_DELETE_BLOCK_COUNT).longValue());
 
@@ -513,9 +348,6 @@ public class TestBlockDeletingService {
   public void testBlockDeletionTimeout() throws Exception {
     conf.setInt(OZONE_BLOCK_DELETING_CONTAINER_LIMIT_PER_INTERVAL, 10);
     conf.setInt(OZONE_BLOCK_DELETING_LIMIT_PER_CONTAINER, 2);
-    this.blockLimitPerTask =
-        conf.getInt(OZONE_BLOCK_DELETING_LIMIT_PER_CONTAINER,
-            OZONE_BLOCK_DELETING_LIMIT_PER_CONTAINER_DEFAULT);
     ContainerSet containerSet = new ContainerSet();
     createToDeleteBlocks(containerSet, 1, 3, 1);
     ContainerMetrics metrics = ContainerMetrics.create(conf);
@@ -562,7 +394,7 @@ public class TestBlockDeletingService {
       LogCapturer newLog = LogCapturer.captureLogs(BackgroundService.LOG);
       GenericTestUtils.waitFor(() -> {
         try {
-          return getUnderDeletionBlocksCount(meta, data) == 0;
+          return getUnderDeletionBlocksCount(meta) == 0;
         } catch (IOException ignored) {
         }
         return false;
@@ -613,9 +445,6 @@ public class TestBlockDeletingService {
         TopNOrderedContainerDeletionChoosingPolicy.class.getName());
     conf.setInt(OZONE_BLOCK_DELETING_CONTAINER_LIMIT_PER_INTERVAL, 1);
     conf.setInt(OZONE_BLOCK_DELETING_LIMIT_PER_CONTAINER, 1);
-    this.blockLimitPerTask =
-        conf.getInt(OZONE_BLOCK_DELETING_LIMIT_PER_CONTAINER,
-            OZONE_BLOCK_DELETING_LIMIT_PER_CONTAINER_DEFAULT);
     ContainerSet containerSet = new ContainerSet();
 
     int containerCount = 2;
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManager.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManager.java
index 7bccd8e..548f073 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManager.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManager.java
@@ -92,15 +92,11 @@ import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 
 import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY;
-import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_CONTAINER_REPORT_INTERVAL;
-import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_COMMAND_STATUS_REPORT_INTERVAL;
-import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_SCM_SAFEMODE_PIPELINE_CREATION;
+import static org.apache.hadoop.hdds.HddsConfigKeys.*;
 import static org.junit.Assert.fail;
 import static org.mockito.Matchers.argThat;
 import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.*;
 
 /**
  * Test class that exercises the StorageContainerManager.
@@ -276,6 +272,8 @@ public class TestStorageContainerManager {
 
       Map<Long, List<Long>> containerBlocks = createDeleteTXLog(delLog,
           keyLocations, helper);
+      Set<Long> containerIDs = containerBlocks.keySet();
+
       // Verify a few TX gets created in the TX log.
       Assert.assertTrue(delLog.getNumOfValidTransactions() > 0);
 
@@ -291,7 +289,8 @@ public class TestStorageContainerManager {
           return false;
         }
       }, 1000, 10000);
-      Assert.assertTrue(helper.verifyBlocksWithTxnTable(containerBlocks));
+      Assert.assertTrue(helper.getAllBlocks(containerIDs).isEmpty());
+
       // Continue the work, add some TXs that with known container names,
       // but unknown block IDs.
       for (Long containerID : containerBlocks.keySet()) {
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManagerHelper.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManagerHelper.java
index fe0e075..c67fe30 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManagerHelper.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManagerHelper.java
@@ -17,7 +17,6 @@
 package org.apache.hadoop.ozone;
 
 import java.io.IOException;
-import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -33,14 +32,9 @@ import org.apache.hadoop.ozone.container.common.helpers.BlockData;
 import org.apache.hadoop.ozone.container.common.utils.ReferenceCountedDB;
 import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
 import org.apache.hadoop.ozone.container.keyvalue.helpers.BlockUtils;
-import org.apache.hadoop.ozone.container.metadata.DatanodeStore;
-import org.apache.hadoop.ozone.container.metadata.DatanodeStoreSchemaTwoImpl;
 import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
 import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
 import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction;
-
 
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
@@ -143,30 +137,6 @@ public class TestStorageContainerManagerHelper {
     return allBlocks;
   }
 
-  public boolean verifyBlocksWithTxnTable(Map<Long, List<Long>> containerBlocks)
-      throws IOException {
-    Set<Long> containerIDs = containerBlocks.keySet();
-    for (Long entry : containerIDs) {
-      ReferenceCountedDB meta = getContainerMetadata(entry);
-      DatanodeStore ds = meta.getStore();
-      DatanodeStoreSchemaTwoImpl dnStoreTwoImpl =
-          (DatanodeStoreSchemaTwoImpl) ds;
-      List<? extends Table.KeyValue<Long, DeletedBlocksTransaction>>
-          txnsInTxnTable = dnStoreTwoImpl.getDeleteTransactionTable()
-          .getRangeKVs(null, Integer.MAX_VALUE, null);
-      List<Long> conID = new ArrayList<>();
-      for (Table.KeyValue<Long, DeletedBlocksTransaction> txn :
-          txnsInTxnTable) {
-        conID.addAll(txn.getValue().getLocalIDList());
-      }
-      if (!conID.equals(containerBlocks.get(entry))) {
-        return false;
-      }
-      meta.close();
-    }
-    return true;
-  }
-
   private ReferenceCountedDB getContainerMetadata(Long containerID)
       throws IOException {
     ContainerWithPipeline containerWithPipeline = cluster


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org


[ozone] 02/02: HDDS-4369. Datanode should store the delete transaction as is in rocksDB (#1702)

Posted by lj...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ljain pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git

commit 431e9097b4d0e7f3d5de04ee86cc8d4ba0c05e30
Author: Aryan Gupta <44...@users.noreply.github.com>
AuthorDate: Wed Jan 6 20:15:44 2021 +0530

    HDDS-4369. Datanode should store the delete transaction as is in rocksDB (#1702)
---
 .../java/org/apache/hadoop/ozone/OzoneConsts.java  |   6 +-
 .../commandhandler/DeleteBlocksCommandHandler.java | 191 ++++++++------
 .../background/BlockDeletingService.java           | 171 +++++++++++--
 .../metadata/AbstractDatanodeDBDefinition.java     |   2 +-
 .../metadata/DatanodeSchemaOneDBDefinition.java    |   5 +
 .../metadata/DatanodeSchemaTwoDBDefinition.java    |  32 ++-
 .../metadata/DatanodeStoreSchemaTwoImpl.java       |  14 +-
 ...mpl.java => DeletedBlocksTransactionCodec.java} |  35 +--
 .../container/common/TestBlockDeletingService.java | 279 +++++++++++++++++----
 .../hadoop/ozone/TestStorageContainerManager.java  |  13 +-
 .../ozone/TestStorageContainerManagerHelper.java   |  30 +++
 11 files changed, 604 insertions(+), 174 deletions(-)

diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
index d2d6e35..9836452 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
@@ -254,11 +254,15 @@ public final class OzoneConsts {
   // versions, requiring this property to be tracked on a per container basis.
   // V1: All data in default column family.
   public static final String SCHEMA_V1 = "1";
-  // V2: Metadata, block data, and deleted blocks in their own column families.
+  // V2: Metadata, block data, and delete transactions in their own
+  // column families.
   public static final String SCHEMA_V2 = "2";
   // Most recent schema version that all new containers should be created with.
   public static final String SCHEMA_LATEST = SCHEMA_V2;
 
+  public static final String[] SCHEMA_VERSIONS =
+      new String[] {SCHEMA_V1, SCHEMA_V2};
+
   // Supported store types.
   public static final String OZONE = "ozone";
   public static final String S3 = "s3";
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteBlocksCommandHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteBlocksCommandHandler.java
index 91ab4c9..10e6797 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteBlocksCommandHandler.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteBlocksCommandHandler.java
@@ -42,6 +42,8 @@ import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
 import org.apache.hadoop.ozone.container.common.statemachine
     .SCMConnectionManager;
 import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
+import org.apache.hadoop.ozone.container.metadata.DatanodeStore;
+import org.apache.hadoop.ozone.container.metadata.DatanodeStoreSchemaTwoImpl;
 import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
 import org.apache.hadoop.ozone.protocol.commands.CommandStatus;
 import org.apache.hadoop.ozone.protocol.commands.DeleteBlockCommandStatus;
@@ -59,6 +61,8 @@ import java.util.function.Consumer;
 
 import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
     .Result.CONTAINER_NOT_FOUND;
+import static org.apache.hadoop.ozone.OzoneConsts.SCHEMA_V1;
+import static org.apache.hadoop.ozone.OzoneConsts.SCHEMA_V2;
 
 /**
  * Handle block deletion commands.
@@ -116,6 +120,7 @@ public class DeleteBlocksCommandHandler implements CommandHandler {
             DeleteBlockTransactionResult.newBuilder();
         txResultBuilder.setTxID(entry.getTxID());
         long containerId = entry.getContainerID();
+        int newDeletionBlocks = 0;
         try {
           Container cont = containerSet.getContainer(containerId);
           if (cont == null) {
@@ -129,7 +134,16 @@ public class DeleteBlocksCommandHandler implements CommandHandler {
                 cont.getContainerData();
             cont.writeLock();
             try {
-              deleteKeyValueContainerBlocks(containerData, entry);
+              if (containerData.getSchemaVersion().equals(SCHEMA_V1)) {
+                markBlocksForDeletionSchemaV1(containerData, entry);
+              } else if (containerData.getSchemaVersion().equals(SCHEMA_V2)) {
+                markBlocksForDeletionSchemaV2(containerData, entry,
+                    newDeletionBlocks, entry.getTxID());
+              } else {
+                throw new UnsupportedOperationException(
+                    "Only schema version 1 and schema version 2 are "
+                        + "supported.");
+              }
             } finally {
               cont.writeUnlock();
             }
@@ -187,107 +201,140 @@ public class DeleteBlocksCommandHandler implements CommandHandler {
    * @param delTX a block deletion transaction.
    * @throws IOException if I/O error occurs.
    */
-  private void deleteKeyValueContainerBlocks(
-      KeyValueContainerData containerData, DeletedBlocksTransaction delTX)
-      throws IOException {
+
+  private void markBlocksForDeletionSchemaV2(
+      KeyValueContainerData containerData, DeletedBlocksTransaction delTX,
+      int newDeletionBlocks, long txnID) throws IOException {
     long containerId = delTX.getContainerID();
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Processing Container : {}, DB path : {}", containerId,
-          containerData.getMetadataPath());
+    if (!isTxnIdValid(containerId, containerData, delTX)) {
+      return;
     }
-
-    if (delTX.getTxID() < containerData.getDeleteTransactionId()) {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug(String.format("Ignoring delete blocks for containerId: %d."
-                + " Outdated delete transactionId %d < %d", containerId,
-            delTX.getTxID(), containerData.getDeleteTransactionId()));
+    try (ReferenceCountedDB containerDB = BlockUtils
+        .getDB(containerData, conf)) {
+      DatanodeStore ds = containerDB.getStore();
+      DatanodeStoreSchemaTwoImpl dnStoreTwoImpl =
+          (DatanodeStoreSchemaTwoImpl) ds;
+      Table<Long, DeletedBlocksTransaction> delTxTable =
+          dnStoreTwoImpl.getDeleteTransactionTable();
+      try (BatchOperation batch = containerDB.getStore().getBatchHandler()
+          .initBatchOperation()) {
+        delTxTable.putWithBatch(batch, txnID, delTX);
+        newDeletionBlocks += delTX.getLocalIDList().size();
+        updateMetaData(containerData, delTX, newDeletionBlocks, containerDB,
+            batch);
+        containerDB.getStore().getBatchHandler().commitBatchOperation(batch);
       }
-      return;
     }
+  }
 
+  private void markBlocksForDeletionSchemaV1(
+      KeyValueContainerData containerData, DeletedBlocksTransaction delTX)
+      throws IOException {
+    long containerId = delTX.getContainerID();
+    if (!isTxnIdValid(containerId, containerData, delTX)) {
+      return;
+    }
     int newDeletionBlocks = 0;
-    try(ReferenceCountedDB containerDB =
-            BlockUtils.getDB(containerData, conf)) {
+    try (ReferenceCountedDB containerDB = BlockUtils
+        .getDB(containerData, conf)) {
       Table<String, BlockData> blockDataTable =
-              containerDB.getStore().getBlockDataTable();
+          containerDB.getStore().getBlockDataTable();
       Table<String, ChunkInfoList> deletedBlocksTable =
-              containerDB.getStore().getDeletedBlocksTable();
+          containerDB.getStore().getDeletedBlocksTable();
 
-      for (Long blkLong : delTX.getLocalIDList()) {
-        String blk = blkLong.toString();
-        BlockData blkInfo = blockDataTable.get(blk);
-        if (blkInfo != null) {
-          String deletingKey = OzoneConsts.DELETING_KEY_PREFIX + blk;
-
-          if (blockDataTable.get(deletingKey) != null
-              || deletedBlocksTable.get(blk) != null) {
-            if (LOG.isDebugEnabled()) {
-              LOG.debug(String.format(
-                  "Ignoring delete for block %s in container %d."
-                      + " Entry already added.", blk, containerId));
+      try (BatchOperation batch = containerDB.getStore().getBatchHandler()
+          .initBatchOperation()) {
+        for (Long blkLong : delTX.getLocalIDList()) {
+          String blk = blkLong.toString();
+          BlockData blkInfo = blockDataTable.get(blk);
+          if (blkInfo != null) {
+            String deletingKey = OzoneConsts.DELETING_KEY_PREFIX + blk;
+            if (blockDataTable.get(deletingKey) != null
+                || deletedBlocksTable.get(blk) != null) {
+              if (LOG.isDebugEnabled()) {
+                LOG.debug(String.format(
+                    "Ignoring delete for block %s in container %d."
+                        + " Entry already added.", blk, containerId));
+              }
+              continue;
             }
-            continue;
-          }
-
-          try(BatchOperation batch = containerDB.getStore()
-              .getBatchHandler().initBatchOperation()) {
             // Found the block in container db,
             // use an atomic update to change its state to deleting.
             blockDataTable.putWithBatch(batch, deletingKey, blkInfo);
             blockDataTable.deleteWithBatch(batch, blk);
-            containerDB.getStore().getBatchHandler()
-                .commitBatchOperation(batch);
             newDeletionBlocks++;
             if (LOG.isDebugEnabled()) {
               LOG.debug("Transited Block {} to DELETING state in container {}",
                   blk, containerId);
             }
-          } catch (IOException e) {
-            // if some blocks failed to delete, we fail this TX,
-            // without sending this ACK to SCM, SCM will resend the TX
-            // with a certain number of retries.
-            throw new IOException(
-                "Failed to delete blocks for TXID = " + delTX.getTxID(), e);
-          }
-        } else {
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("Block {} not found or already under deletion in"
-                + " container {}, skip deleting it.", blk, containerId);
+          } else {
+            if (LOG.isDebugEnabled()) {
+              LOG.debug("Block {} not found or already under deletion in"
+                  + " container {}, skip deleting it.", blk, containerId);
+            }
           }
         }
+        updateMetaData(containerData, delTX, newDeletionBlocks, containerDB,
+            batch);
+        containerDB.getStore().getBatchHandler().commitBatchOperation(batch);
+      } catch (IOException e) {
+        // if some blocks failed to delete, we fail this TX,
+        // without sending this ACK to SCM, SCM will resend the TX
+        // with a certain number of retries.
+        throw new IOException(
+            "Failed to delete blocks for TXID = " + delTX.getTxID(), e);
       }
+    }
+  }
 
-      if (newDeletionBlocks > 0) {
-        // Finally commit the DB counters.
-        try(BatchOperation batchOperation =
-                containerDB.getStore().getBatchHandler().initBatchOperation()) {
-          Table< String, Long > metadataTable = containerDB.getStore()
-              .getMetadataTable();
+  private void updateMetaData(KeyValueContainerData containerData,
+      DeletedBlocksTransaction delTX, int newDeletionBlocks,
+      ReferenceCountedDB containerDB, BatchOperation batchOperation)
+      throws IOException {
+    if (newDeletionBlocks > 0) {
+      // Finally commit the DB counters.
+      Table<String, Long> metadataTable =
+          containerDB.getStore().getMetadataTable();
 
-          // In memory is updated only when existing delete transactionID is
-          // greater.
-          if (delTX.getTxID() > containerData.getDeleteTransactionId()) {
-            // Update in DB pending delete key count and delete transaction ID.
-            metadataTable.putWithBatch(batchOperation,
-                OzoneConsts.DELETE_TRANSACTION_KEY, delTX.getTxID());
-          }
+      // In memory is updated only when existing delete transactionID is
+      // greater.
+      if (delTX.getTxID() > containerData.getDeleteTransactionId()) {
+        // Update in DB pending delete key count and delete transaction ID.
+        metadataTable
+            .putWithBatch(batchOperation, OzoneConsts.DELETE_TRANSACTION_KEY,
+                delTX.getTxID());
+      }
 
-          long pendingDeleteBlocks =
-              containerData.getNumPendingDeletionBlocks() + newDeletionBlocks;
-          metadataTable.putWithBatch(batchOperation,
-              OzoneConsts.PENDING_DELETE_BLOCK_COUNT, pendingDeleteBlocks);
+      long pendingDeleteBlocks =
+          containerData.getNumPendingDeletionBlocks() + newDeletionBlocks;
+      metadataTable
+          .putWithBatch(batchOperation, OzoneConsts.PENDING_DELETE_BLOCK_COUNT,
+              pendingDeleteBlocks);
 
-          containerDB.getStore().getBatchHandler()
-              .commitBatchOperation(batchOperation);
+      // update pending deletion blocks count and delete transaction ID in
+      // in-memory container status
+      containerData.updateDeleteTransactionId(delTX.getTxID());
+      containerData.incrPendingDeletionBlocks(newDeletionBlocks);
+    }
+  }
 
-          // update pending deletion blocks count and delete transaction ID in
-          // in-memory container status
-          containerData.updateDeleteTransactionId(delTX.getTxID());
+  private boolean isTxnIdValid(long containerId,
+      KeyValueContainerData containerData, DeletedBlocksTransaction delTX) {
+    boolean b = true;
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Processing Container : {}, DB path : {}", containerId,
+          containerData.getMetadataPath());
+    }
 
-          containerData.incrPendingDeletionBlocks(newDeletionBlocks);
-        }
+    if (delTX.getTxID() < containerData.getDeleteTransactionId()) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(String.format("Ignoring delete blocks for containerId: %d."
+                + " Outdated delete transactionId %d < %d", containerId,
+            delTX.getTxID(), containerData.getDeleteTransactionId()));
       }
+      b = false;
     }
+    return b;
   }
 
   @Override
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/statemachine/background/BlockDeletingService.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/statemachine/background/BlockDeletingService.java
index b03b7d7..3dab1fa 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/statemachine/background/BlockDeletingService.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/statemachine/background/BlockDeletingService.java
@@ -20,11 +20,12 @@ package org.apache.hadoop.ozone.container.keyvalue.statemachine.background;
 
 import java.io.File;
 import java.io.IOException;
+import java.util.UUID;
 import java.util.LinkedList;
+import java.util.Objects;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
-import java.util.Objects;
-import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
@@ -32,14 +33,15 @@ import org.apache.hadoop.hdds.conf.ConfigurationSource;
 import org.apache.hadoop.hdds.scm.ScmConfigKeys;
 import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
-import org.apache.hadoop.hdds.utils.BackgroundService;
-import org.apache.hadoop.hdds.utils.BackgroundTask;
-import org.apache.hadoop.hdds.utils.BackgroundTaskQueue;
 import org.apache.hadoop.hdds.utils.BackgroundTaskResult;
 import org.apache.hadoop.hdds.utils.db.BatchOperation;
 import org.apache.hadoop.hdds.utils.MetadataKeyFilters;
+import org.apache.hadoop.hdds.utils.BackgroundTaskQueue;
+import org.apache.hadoop.hdds.utils.BackgroundService;
+import org.apache.hadoop.hdds.utils.BackgroundTask;
 import org.apache.hadoop.hdds.utils.MetadataKeyFilters.KeyPrefixFilter;
 import org.apache.hadoop.hdds.utils.db.Table;
+import org.apache.hadoop.hdds.utils.db.TableIterator;
 import org.apache.hadoop.ozone.container.common.helpers.BlockData;
 import org.apache.hadoop.ozone.container.common.impl.ContainerData;
 import org.apache.hadoop.ozone.container.common.impl.TopNOrderedContainerDeletionChoosingPolicy;
@@ -50,14 +52,22 @@ import org.apache.hadoop.ozone.container.common.transport.server.ratis.XceiverSe
 import org.apache.hadoop.ozone.container.common.utils.ReferenceCountedDB;
 import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
 import org.apache.hadoop.ozone.container.keyvalue.helpers.BlockUtils;
+import org.apache.hadoop.ozone.container.metadata.DatanodeStore;
+import org.apache.hadoop.ozone.container.metadata.DatanodeStoreSchemaTwoImpl;
 import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
 import org.apache.hadoop.util.Time;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction;
 
 import com.google.common.collect.Lists;
+
 import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_CONTAINER_LIMIT_PER_INTERVAL;
 import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_CONTAINER_LIMIT_PER_INTERVAL_DEFAULT;
 import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_LIMIT_PER_CONTAINER;
 import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_LIMIT_PER_CONTAINER_DEFAULT;
+import static org.apache.hadoop.ozone.OzoneConsts.SCHEMA_V1;
+import static org.apache.hadoop.ozone.OzoneConsts.SCHEMA_V2;
+
 import org.apache.ratis.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -66,7 +76,7 @@ import org.slf4j.LoggerFactory;
  * A per-datanode container block deleting service takes in charge
  * of deleting staled ozone blocks.
  */
-// TODO: Fix BlockDeletingService to work with new StorageLayer
+
 public class BlockDeletingService extends BackgroundService {
 
   private static final Logger LOG =
@@ -244,21 +254,54 @@ public class BlockDeletingService extends BackgroundService {
 
     @Override
     public BackgroundTaskResult call() throws Exception {
-      ContainerBackgroundTaskResult crr = new ContainerBackgroundTaskResult();
+      ContainerBackgroundTaskResult crr;
       final Container container = ozoneContainer.getContainerSet()
           .getContainer(containerData.getContainerID());
       container.writeLock();
+      File dataDir = new File(containerData.getChunksPath());
       long startTime = Time.monotonicNow();
       // Scan container's db and get list of under deletion blocks
       try (ReferenceCountedDB meta = BlockUtils.getDB(containerData, conf)) {
+        if (containerData.getSchemaVersion().equals(SCHEMA_V1)) {
+          crr = deleteViaSchema1(meta, container, dataDir, startTime);
+        } else if (containerData.getSchemaVersion().equals(SCHEMA_V2)) {
+          crr = deleteViaSchema2(meta, container, dataDir, startTime);
+        } else {
+          throw new UnsupportedOperationException(
+              "Only schema version 1 and schema version 2 are supported.");
+        }
+        return crr;
+      } finally {
+        container.writeUnlock();
+      }
+    }
+
+    public boolean checkDataDir(File dataDir) {
+      boolean b = true;
+      if (!dataDir.exists() || !dataDir.isDirectory()) {
+        LOG.error("Invalid container data dir {} : "
+            + "does not exist or not a directory", dataDir.getAbsolutePath());
+        b = false;
+      }
+      return b;
+    }
+
+    public ContainerBackgroundTaskResult deleteViaSchema1(
+        ReferenceCountedDB meta, Container container, File dataDir,
+        long startTime) throws IOException {
+      ContainerBackgroundTaskResult crr = new ContainerBackgroundTaskResult();
+      if (!checkDataDir(dataDir)) {
+        return crr;
+      }
+      try {
         Table<String, BlockData> blockDataTable =
-                meta.getStore().getBlockDataTable();
+            meta.getStore().getBlockDataTable();
 
         // # of blocks to delete is throttled
         KeyPrefixFilter filter = MetadataKeyFilters.getDeletingKeyFilter();
         List<? extends Table.KeyValue<String, BlockData>> toDeleteBlocks =
             blockDataTable.getSequentialRangeKVs(null, blockLimitPerTask,
-                    filter);
+                filter);
         if (toDeleteBlocks.isEmpty()) {
           LOG.debug("No under deletion block found in container : {}",
               containerData.getContainerID());
@@ -267,12 +310,6 @@ public class BlockDeletingService extends BackgroundService {
         List<String> succeedBlocks = new LinkedList<>();
         LOG.debug("Container : {}, To-Delete blocks : {}",
             containerData.getContainerID(), toDeleteBlocks.size());
-        File dataDir = new File(containerData.getChunksPath());
-        if (!dataDir.exists() || !dataDir.isDirectory()) {
-          LOG.error("Invalid container data dir {} : "
-              + "does not exist or not a directory", dataDir.getAbsolutePath());
-          return crr;
-        }
 
         Handler handler = Objects.requireNonNull(ozoneContainer.getDispatcher()
             .getHandler(container.getContainerType()));
@@ -292,7 +329,7 @@ public class BlockDeletingService extends BackgroundService {
 
         // Once blocks are deleted... remove the blockID from blockDataTable.
         try(BatchOperation batch = meta.getStore().getBatchHandler()
-                .initBatchOperation()) {
+            .initBatchOperation()) {
           for (String entry : succeedBlocks) {
             blockDataTable.deleteWithBatch(batch, entry);
           }
@@ -312,8 +349,106 @@ public class BlockDeletingService extends BackgroundService {
         }
         crr.addAll(succeedBlocks);
         return crr;
-      } finally {
-        container.writeUnlock();
+      } catch (IOException exception) {
+        LOG.warn(
+            "Deletion operation was not successful for container: " + container
+                .getContainerData().getContainerID(), exception);
+        throw exception;
+      }
+    }
+
+    public ContainerBackgroundTaskResult deleteViaSchema2(
+        ReferenceCountedDB meta, Container container, File dataDir,
+        long startTime) throws IOException {
+      ContainerBackgroundTaskResult crr = new ContainerBackgroundTaskResult();
+      if (!checkDataDir(dataDir)) {
+        return crr;
+      }
+      try {
+        Table<String, BlockData> blockDataTable =
+            meta.getStore().getBlockDataTable();
+        DatanodeStore ds = meta.getStore();
+        DatanodeStoreSchemaTwoImpl dnStoreTwoImpl =
+            (DatanodeStoreSchemaTwoImpl) ds;
+        Table<Long, DeletedBlocksTransaction>
+            deleteTxns = dnStoreTwoImpl.getDeleteTransactionTable();
+        List<DeletedBlocksTransaction> delBlocks = new ArrayList<>();
+        int totalBlocks = 0;
+        try (TableIterator<Long,
+            ? extends Table.KeyValue<Long, DeletedBlocksTransaction>> iter =
+            dnStoreTwoImpl.getDeleteTransactionTable().iterator()) {
+          while (iter.hasNext() && (totalBlocks < blockLimitPerTask)) {
+            DeletedBlocksTransaction delTx = iter.next().getValue();
+            totalBlocks += delTx.getLocalIDList().size();
+            delBlocks.add(delTx);
+          }
+        }
+
+        if (delBlocks.isEmpty()) {
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("No transaction found in container : {}",
+                containerData.getContainerID());
+          }
+          return crr;
+        }
+
+        LOG.debug("Container : {}, To-Delete blocks : {}",
+            containerData.getContainerID(), delBlocks.size());
+
+        Handler handler = Objects.requireNonNull(ozoneContainer.getDispatcher()
+            .getHandler(container.getContainerType()));
+
+        deleteTransactions(delBlocks, handler, blockDataTable, container);
+
+        // Once blocks are deleted... remove the blockID from blockDataTable
+        // and also remove the transactions from txnTable.
+        try(BatchOperation batch = meta.getStore().getBatchHandler()
+            .initBatchOperation()) {
+          for (DeletedBlocksTransaction delTx : delBlocks) {
+            deleteTxns.deleteWithBatch(batch, delTx.getTxID());
+            for (Long blk : delTx.getLocalIDList()) {
+              String bID = blk.toString();
+              meta.getStore().getBlockDataTable().deleteWithBatch(batch, bID);
+            }
+          }
+          meta.getStore().getBatchHandler().commitBatchOperation(batch);
+          containerData.updateAndCommitDBCounters(meta, batch,
+              totalBlocks);
+          // update count of pending deletion blocks and block count in
+          // in-memory container status.
+          containerData.decrPendingDeletionBlocks(totalBlocks);
+          containerData.decrKeyCount(totalBlocks);
+        }
+
+        LOG.info("Container: {}, deleted blocks: {}, task elapsed time: {}ms",
+            containerData.getContainerID(), totalBlocks,
+            Time.monotonicNow() - startTime);
+
+        return crr;
+      } catch (IOException exception) {
+        LOG.warn(
+            "Deletion operation was not successful for container: " + container
+                .getContainerData().getContainerID(), exception);
+        throw exception;
+      }
+    }
+
+    private void deleteTransactions(List<DeletedBlocksTransaction> delBlocks,
+        Handler handler, Table<String, BlockData> blockDataTable,
+        Container container) throws IOException {
+      for (DeletedBlocksTransaction entry : delBlocks) {
+        for (Long blkLong : entry.getLocalIDList()) {
+          String blk = blkLong.toString();
+          BlockData blkInfo = blockDataTable.get(blk);
+          LOG.debug("Deleting block {}", blk);
+          try {
+            handler.deleteBlock(container, blkInfo);
+          } catch (InvalidProtocolBufferException e) {
+            LOG.error("Failed to parse block info for block {}", blk, e);
+          } catch (IOException e) {
+            LOG.error("Failed to delete files for block {}", blk, e);
+          }
+        }
       }
     }
 
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/AbstractDatanodeDBDefinition.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/AbstractDatanodeDBDefinition.java
index 8895475..2fb1174 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/AbstractDatanodeDBDefinition.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/AbstractDatanodeDBDefinition.java
@@ -60,7 +60,7 @@ public abstract class AbstractDatanodeDBDefinition implements DBDefinition {
   @Override
   public DBColumnFamilyDefinition[] getColumnFamilies() {
     return new DBColumnFamilyDefinition[] {getBlockDataColumnFamily(),
-            getMetadataColumnFamily(), getDeletedBlocksColumnFamily()};
+        getMetadataColumnFamily(), getDeletedBlocksColumnFamily()};
   }
 
   public abstract DBColumnFamilyDefinition<String, BlockData>
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeSchemaOneDBDefinition.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeSchemaOneDBDefinition.java
index faf399d..7d5e053 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeSchemaOneDBDefinition.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeSchemaOneDBDefinition.java
@@ -88,4 +88,9 @@ public class DatanodeSchemaOneDBDefinition
       getDeletedBlocksColumnFamily() {
     return DELETED_BLOCKS;
   }
+
+  public DBColumnFamilyDefinition[] getColumnFamilies() {
+    return new DBColumnFamilyDefinition[] {getBlockDataColumnFamily(),
+        getMetadataColumnFamily(), getDeletedBlocksColumnFamily() };
+  }
 }
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeSchemaTwoDBDefinition.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeSchemaTwoDBDefinition.java
index 2ac56f2..1fabd13 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeSchemaTwoDBDefinition.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeSchemaTwoDBDefinition.java
@@ -17,16 +17,19 @@
  */
 package org.apache.hadoop.ozone.container.metadata;
 
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos;
 import org.apache.hadoop.hdds.utils.db.DBColumnFamilyDefinition;
 import org.apache.hadoop.hdds.utils.db.LongCodec;
 import org.apache.hadoop.hdds.utils.db.StringCodec;
 import org.apache.hadoop.ozone.container.common.helpers.BlockData;
 import org.apache.hadoop.ozone.container.common.helpers.ChunkInfoList;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction;
 
 /**
  * This class defines the RocksDB structure for datanodes following schema
- * version 2, where the block data, metadata, and deleted block ids are put in
- * their own separate column families.
+ * version 2, where the block data, metadata, and transactions which are to be
+ * deleted are put in their own separate column families.
  */
 public class DatanodeSchemaTwoDBDefinition extends
         AbstractDatanodeDBDefinition {
@@ -34,7 +37,7 @@ public class DatanodeSchemaTwoDBDefinition extends
   public static final DBColumnFamilyDefinition<String, BlockData>
           BLOCK_DATA =
           new DBColumnFamilyDefinition<>(
-                  "block_data",
+                  "blockData",
                   String.class,
                   new StringCodec(),
                   BlockData.class,
@@ -52,17 +55,33 @@ public class DatanodeSchemaTwoDBDefinition extends
   public static final DBColumnFamilyDefinition<String, ChunkInfoList>
           DELETED_BLOCKS =
           new DBColumnFamilyDefinition<>(
-                  "deleted_blocks",
+                  "deletedBlocks",
                   String.class,
                   new StringCodec(),
                   ChunkInfoList.class,
                   new ChunkInfoListCodec());
 
+  public static final DBColumnFamilyDefinition<Long, DeletedBlocksTransaction>
+      DELETE_TRANSACTION =
+      new DBColumnFamilyDefinition<>(
+          "deleteTxns",
+          Long.class,
+          new LongCodec(),
+          StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction.class,
+          new DeletedBlocksTransactionCodec());
+
   protected DatanodeSchemaTwoDBDefinition(String dbPath) {
     super(dbPath);
   }
 
   @Override
+  public DBColumnFamilyDefinition[] getColumnFamilies() {
+    return new DBColumnFamilyDefinition[] {getBlockDataColumnFamily(),
+        getMetadataColumnFamily(), getDeletedBlocksColumnFamily(),
+        getDeleteTransactionsColumnFamily()};
+  }
+
+  @Override
   public DBColumnFamilyDefinition<String, BlockData>
       getBlockDataColumnFamily() {
     return BLOCK_DATA;
@@ -78,4 +97,9 @@ public class DatanodeSchemaTwoDBDefinition extends
       getDeletedBlocksColumnFamily() {
     return DELETED_BLOCKS;
   }
+
+  public DBColumnFamilyDefinition<Long, DeletedBlocksTransaction>
+      getDeleteTransactionsColumnFamily() {
+    return DELETE_TRANSACTION;
+  }
 }
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeStoreSchemaTwoImpl.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeStoreSchemaTwoImpl.java
index df9b8c0..db8fe6b 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeStoreSchemaTwoImpl.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeStoreSchemaTwoImpl.java
@@ -18,6 +18,9 @@
 package org.apache.hadoop.ozone.container.metadata;
 
 import org.apache.hadoop.hdds.conf.ConfigurationSource;
+import org.apache.hadoop.hdds.protocol.proto.
+    StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction;
+import org.apache.hadoop.hdds.utils.db.Table;
 
 import java.io.IOException;
 
@@ -26,10 +29,13 @@ import java.io.IOException;
  * three column families/tables:
  * 1. A block data table.
  * 2. A metadata table.
- * 3. A deleted blocks table.
+ * 3. A Delete Transaction Table.
  */
 public class DatanodeStoreSchemaTwoImpl extends AbstractDatanodeStore {
 
+  private final Table<Long, DeletedBlocksTransaction>
+      deleteTransactionTable;
+
   /**
    * Constructs the datanode store and starts the DB Services.
    *
@@ -41,5 +47,11 @@ public class DatanodeStoreSchemaTwoImpl extends AbstractDatanodeStore {
       throws IOException {
     super(config, containerID, new DatanodeSchemaTwoDBDefinition(dbPath),
         openReadOnly);
+    this.deleteTransactionTable = new DatanodeSchemaTwoDBDefinition(dbPath)
+        .getDeleteTransactionsColumnFamily().getTable(getStore());
+  }
+
+  public Table<Long, DeletedBlocksTransaction> getDeleteTransactionTable() {
+    return deleteTransactionTable;
   }
 }
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeStoreSchemaTwoImpl.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DeletedBlocksTransactionCodec.java
similarity index 54%
copy from hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeStoreSchemaTwoImpl.java
copy to hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DeletedBlocksTransactionCodec.java
index df9b8c0..90c26fe 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeStoreSchemaTwoImpl.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DeletedBlocksTransactionCodec.java
@@ -17,29 +17,30 @@
  */
 package org.apache.hadoop.ozone.container.metadata;
 
-import org.apache.hadoop.hdds.conf.ConfigurationSource;
+import org.apache.hadoop.hdds.utils.db.Codec;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction;
 
 import java.io.IOException;
 
 /**
- * Constructs a datanode store in accordance with schema version 2, which uses
- * three column families/tables:
- * 1. A block data table.
- * 2. A metadata table.
- * 3. A deleted blocks table.
+ * Supports encoding and decoding {@link DeletedBlocksTransaction} objects.
  */
-public class DatanodeStoreSchemaTwoImpl extends AbstractDatanodeStore {
+public class DeletedBlocksTransactionCodec
+    implements Codec<DeletedBlocksTransaction> {
 
-  /**
-   * Constructs the datanode store and starts the DB Services.
-   *
-   * @param config - Ozone Configuration.
-   * @throws IOException - on Failure.
-   */
-  public DatanodeStoreSchemaTwoImpl(ConfigurationSource config,
-      long containerID, String dbPath, boolean openReadOnly)
+  @Override public byte[] toPersistedFormat(
+      DeletedBlocksTransaction deletedBlocksTransaction) {
+    return deletedBlocksTransaction.toByteArray();
+  }
+
+  @Override public DeletedBlocksTransaction fromPersistedFormat(byte[] rawData)
       throws IOException {
-    super(config, containerID, new DatanodeSchemaTwoDBDefinition(dbPath),
-        openReadOnly);
+    return DeletedBlocksTransaction.parseFrom(rawData);
+  }
+
+  @Override public DeletedBlocksTransaction copyObject(
+      DeletedBlocksTransaction deletedBlocksTransaction) {
+    throw new UnsupportedOperationException();
   }
 }
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/TestBlockDeletingService.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/TestBlockDeletingService.java
index 2eb6a39..96d4228 100644
--- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/TestBlockDeletingService.java
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/TestBlockDeletingService.java
@@ -21,9 +21,10 @@ package org.apache.hadoop.ozone.container.common;
 import java.io.File;
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.UUID;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
-import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
@@ -34,9 +35,13 @@ import org.apache.hadoop.hdds.conf.ConfigurationSource;
 import org.apache.hadoop.hdds.conf.MutableConfigurationSource;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos;
 import org.apache.hadoop.hdds.scm.ScmConfigKeys;
 import org.apache.hadoop.hdds.utils.BackgroundService;
 import org.apache.hadoop.hdds.utils.MetadataKeyFilters;
+import org.apache.hadoop.hdds.utils.db.BatchOperation;
+import org.apache.hadoop.hdds.utils.db.Table;
+import org.apache.hadoop.hdds.utils.db.TableIterator;
 import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.ozone.common.Checksum;
 import org.apache.hadoop.ozone.common.ChunkBuffer;
@@ -55,7 +60,6 @@ import org.apache.hadoop.ozone.container.common.utils.ReferenceCountedDB;
 import org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet;
 import org.apache.hadoop.ozone.container.common.volume.RoundRobinVolumeChoosingPolicy;
 import org.apache.hadoop.ozone.container.common.volume.VolumeSet;
-import org.apache.hadoop.ozone.container.keyvalue.ChunkLayoutTestInfo;
 import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainer;
 import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
 import org.apache.hadoop.ozone.container.keyvalue.KeyValueHandler;
@@ -64,11 +68,14 @@ import org.apache.hadoop.ozone.container.keyvalue.impl.FilePerBlockStrategy;
 import org.apache.hadoop.ozone.container.keyvalue.impl.FilePerChunkStrategy;
 import org.apache.hadoop.ozone.container.keyvalue.interfaces.ChunkManager;
 import org.apache.hadoop.ozone.container.keyvalue.statemachine.background.BlockDeletingService;
+import org.apache.hadoop.ozone.container.metadata.DatanodeStore;
+import org.apache.hadoop.ozone.container.metadata.DatanodeStoreSchemaTwoImpl;
 import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
 import org.apache.hadoop.ozone.container.testutils.BlockDeletingServiceTestImpl;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.hadoop.test.GenericTestUtils.LogCapturer;
 
+import static java.util.stream.Collectors.toList;
 import static org.apache.commons.lang3.RandomStringUtils.randomAlphanumeric;
 
 import org.junit.AfterClass;
@@ -82,7 +89,11 @@ import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_CONTA
 import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_LIMIT_PER_CONTAINER;
 import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_LIMIT_PER_CONTAINER_DEFAULT;
 import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_INTERVAL;
+import static org.apache.hadoop.ozone.OzoneConsts.SCHEMA_VERSIONS;
+import static org.apache.hadoop.ozone.OzoneConsts.SCHEMA_V1;
+import static org.apache.hadoop.ozone.OzoneConsts.SCHEMA_V2;
 import static org.apache.hadoop.ozone.container.common.impl.ChunkLayOutVersion.FILE_PER_BLOCK;
+import static org.apache.hadoop.ozone.container.common.states.endpoint.VersionEndpointTask.LOG;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
@@ -101,16 +112,38 @@ public class TestBlockDeletingService {
   private static MutableConfigurationSource conf;
 
   private final ChunkLayOutVersion layout;
+  private final String schemaVersion;
   private int blockLimitPerTask;
   private static VolumeSet volumeSet;
 
-  public TestBlockDeletingService(ChunkLayOutVersion layout) {
-    this.layout = layout;
+  public TestBlockDeletingService(LayoutInfo layoutInfo) {
+    this.layout = layoutInfo.layout;
+    this.schemaVersion = layoutInfo.schemaVersion;
   }
 
   @Parameterized.Parameters
   public static Iterable<Object[]> parameters() {
-    return ChunkLayoutTestInfo.chunkLayoutParameters();
+    return LayoutInfo.layoutList.stream().map(each -> new Object[] {each})
+        .collect(toList());
+  }
+
+  public static class LayoutInfo {
+    private final String schemaVersion;
+    private final ChunkLayOutVersion layout;
+
+    public LayoutInfo(String schemaVersion, ChunkLayOutVersion layout) {
+      this.schemaVersion = schemaVersion;
+      this.layout = layout;
+    }
+
+    private static List<LayoutInfo> layoutList = new ArrayList<>();
+    static {
+      for (ChunkLayOutVersion ch : ChunkLayOutVersion.getAllVersions()) {
+        for (String sch : SCHEMA_VERSIONS) {
+          layoutList.add(new LayoutInfo(sch, ch));
+        }
+      }
+    }
   }
 
   @BeforeClass
@@ -158,6 +191,7 @@ public class TestBlockDeletingService {
     }
     byte[] arr = randomAlphanumeric(1048576).getBytes(UTF_8);
     ChunkBuffer buffer = ChunkBuffer.wrap(ByteBuffer.wrap(arr));
+    int txnID = 0;
     for (int x = 0; x < numOfContainers; x++) {
       long containerID = ContainerTestHelper.getTestContainerID();
       KeyValueContainerData data =
@@ -165,55 +199,164 @@ public class TestBlockDeletingService {
               ContainerTestHelper.CONTAINER_MAX_SIZE,
               UUID.randomUUID().toString(), datanodeUuid);
       data.closeContainer();
+      data.setSchemaVersion(schemaVersion);
       KeyValueContainer container = new KeyValueContainer(data, conf);
       container.create(volumeSet,
           new RoundRobinVolumeChoosingPolicy(), scmId);
       containerSet.addContainer(container);
       data = (KeyValueContainerData) containerSet.getContainer(
           containerID).getContainerData();
-      long chunkLength = 100;
-      try(ReferenceCountedDB metadata = BlockUtils.getDB(data, conf)) {
-        for (int j = 0; j < numOfBlocksPerContainer; j++) {
-          BlockID blockID =
-              ContainerTestHelper.getTestBlockID(containerID);
-          String deleteStateName = OzoneConsts.DELETING_KEY_PREFIX +
-              blockID.getLocalID();
-          BlockData kd = new BlockData(blockID);
-          List<ContainerProtos.ChunkInfo> chunks = Lists.newArrayList();
-          for (int k = 0; k < numOfChunksPerBlock; k++) {
-            final String chunkName = String.format("block.%d.chunk.%d", j, k);
-            final long offset = k * chunkLength;
-            ContainerProtos.ChunkInfo info =
-                ContainerProtos.ChunkInfo.newBuilder()
-                    .setChunkName(chunkName)
-                    .setLen(chunkLength)
-                    .setOffset(offset)
-                    .setChecksumData(Checksum.getNoChecksumDataProto())
-                    .build();
-            chunks.add(info);
-            ChunkInfo chunkInfo = new ChunkInfo(chunkName, offset, chunkLength);
-            ChunkBuffer chunkData = buffer.duplicate(0, (int) chunkLength);
-            chunkManager.writeChunk(container, blockID, chunkInfo, chunkData,
-                WRITE_STAGE);
-            chunkManager.writeChunk(container, blockID, chunkInfo, chunkData,
-                COMMIT_STAGE);
-          }
-          kd.setChunks(chunks);
-          metadata.getStore().getBlockDataTable().put(
-                  deleteStateName, kd);
-          container.getContainerData().incrPendingDeletionBlocks(1);
-        }
-        container.getContainerData().setKeyCount(numOfBlocksPerContainer);
-        // Set block count, bytes used and pending delete block count.
-        metadata.getStore().getMetadataTable().put(
-                OzoneConsts.BLOCK_COUNT, (long)numOfBlocksPerContainer);
-        metadata.getStore().getMetadataTable().put(
-                OzoneConsts.CONTAINER_BYTES_USED,
-            chunkLength * numOfChunksPerBlock * numOfBlocksPerContainer);
-        metadata.getStore().getMetadataTable().put(
-                OzoneConsts.PENDING_DELETE_BLOCK_COUNT,
-                (long)numOfBlocksPerContainer);
+      if (data.getSchemaVersion().equals(SCHEMA_V1)) {
+        createPendingDeleteBlocksSchema1(numOfBlocksPerContainer, data,
+            containerID, numOfChunksPerBlock, buffer, chunkManager, container);
+      } else if (data.getSchemaVersion().equals(SCHEMA_V2)) {
+        createPendingDeleteBlocksSchema2(numOfBlocksPerContainer, txnID,
+            containerID, numOfChunksPerBlock, buffer, chunkManager, container,
+            data);
+      } else {
+        throw new UnsupportedOperationException(
+            "Only schema version 1 and schema version 2 are "
+                + "supported.");
+      }
+    }
+  }
+
+  @SuppressWarnings("checkstyle:parameternumber")
+  private void createPendingDeleteBlocksSchema1(int numOfBlocksPerContainer,
+      KeyValueContainerData data, long containerID, int numOfChunksPerBlock,
+      ChunkBuffer buffer, ChunkManager chunkManager,
+      KeyValueContainer container) {
+    BlockID blockID = null;
+    try (ReferenceCountedDB metadata = BlockUtils.getDB(data, conf)) {
+      for (int j = 0; j < numOfBlocksPerContainer; j++) {
+        blockID = ContainerTestHelper.getTestBlockID(containerID);
+        String deleteStateName =
+            OzoneConsts.DELETING_KEY_PREFIX + blockID.getLocalID();
+        BlockData kd = new BlockData(blockID);
+        List<ContainerProtos.ChunkInfo> chunks = Lists.newArrayList();
+        putChunksInBlock(numOfChunksPerBlock, j, chunks, buffer, chunkManager,
+            container, blockID);
+        kd.setChunks(chunks);
+        metadata.getStore().getBlockDataTable().put(deleteStateName, kd);
+        container.getContainerData().incrPendingDeletionBlocks(1);
+      }
+      updateMetaData(data, container, numOfBlocksPerContainer,
+          numOfChunksPerBlock);
+    } catch (IOException exception) {
+      LOG.info("Exception " + exception);
+      LOG.warn("Failed to put block: " + blockID + " in BlockDataTable.");
+    }
+  }
+
+  @SuppressWarnings("checkstyle:parameternumber")
+  private void createPendingDeleteBlocksSchema2(int numOfBlocksPerContainer,
+      int txnID, long containerID, int numOfChunksPerBlock, ChunkBuffer buffer,
+      ChunkManager chunkManager, KeyValueContainer container,
+      KeyValueContainerData data) {
+    List<Long> containerBlocks = new ArrayList<>();
+    int blockCount = 0;
+    for (int i = 0; i < numOfBlocksPerContainer; i++) {
+      txnID = txnID + 1;
+      BlockID blockID = ContainerTestHelper.getTestBlockID(containerID);
+      BlockData kd = new BlockData(blockID);
+      List<ContainerProtos.ChunkInfo> chunks = Lists.newArrayList();
+      putChunksInBlock(numOfChunksPerBlock, i, chunks, buffer, chunkManager,
+          container, blockID);
+      kd.setChunks(chunks);
+      String bID = null;
+      try (ReferenceCountedDB metadata = BlockUtils.getDB(data, conf)) {
+        bID = blockID.getLocalID() + "";
+        metadata.getStore().getBlockDataTable().put(bID, kd);
+      } catch (IOException exception) {
+        LOG.info("Exception = " + exception);
+        LOG.warn("Failed to put block: " + bID + " in BlockDataTable.");
+      }
+      container.getContainerData().incrPendingDeletionBlocks(1);
+
+      // In below if statements we are checking if a single container
+      // consists of more blocks than 'blockLimitPerTask' then we create
+      // (totalBlocksInContainer / blockLimitPerTask) transactions which
+      // consists of blocks equal to blockLimitPerTask and last transaction
+      // consists of blocks equal to
+      // (totalBlocksInContainer % blockLimitPerTask).
+      containerBlocks.add(blockID.getLocalID());
+      blockCount++;
+      if (blockCount == blockLimitPerTask || i == (numOfBlocksPerContainer
+          - 1)) {
+        createTxn(data, containerBlocks, txnID, containerID);
+        containerBlocks.clear();
+        blockCount = 0;
+      }
+    }
+    updateMetaData(data, container, numOfBlocksPerContainer,
+        numOfChunksPerBlock);
+  }
+
+  private void createTxn(KeyValueContainerData data, List<Long> containerBlocks,
+      int txnID, long containerID) {
+    try (ReferenceCountedDB metadata = BlockUtils.getDB(data, conf)) {
+      StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction dtx =
+          StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction
+              .newBuilder().setTxID(txnID).setContainerID(containerID)
+              .addAllLocalID(containerBlocks).setCount(0).build();
+      try (BatchOperation batch = metadata.getStore().getBatchHandler()
+          .initBatchOperation()) {
+        DatanodeStore ds = metadata.getStore();
+        DatanodeStoreSchemaTwoImpl dnStoreTwoImpl =
+            (DatanodeStoreSchemaTwoImpl) ds;
+        dnStoreTwoImpl.getDeleteTransactionTable()
+            .putWithBatch(batch, (long) txnID, dtx);
+        metadata.getStore().getBatchHandler().commitBatchOperation(batch);
       }
+    } catch (IOException exception) {
+      LOG.warn("Transaction creation was not successful for txnID: " + txnID
+          + " consisting of " + containerBlocks.size() + " blocks.");
+    }
+  }
+
+  private void putChunksInBlock(int numOfChunksPerBlock, int i,
+      List<ContainerProtos.ChunkInfo> chunks, ChunkBuffer buffer,
+      ChunkManager chunkManager, KeyValueContainer container, BlockID blockID) {
+    long chunkLength = 100;
+    try {
+      for (int k = 0; k < numOfChunksPerBlock; k++) {
+        final String chunkName = String.format("block.%d.chunk.%d", i, k);
+        final long offset = k * chunkLength;
+        ContainerProtos.ChunkInfo info =
+            ContainerProtos.ChunkInfo.newBuilder().setChunkName(chunkName)
+                .setLen(chunkLength).setOffset(offset)
+                .setChecksumData(Checksum.getNoChecksumDataProto()).build();
+        chunks.add(info);
+        ChunkInfo chunkInfo = new ChunkInfo(chunkName, offset, chunkLength);
+        ChunkBuffer chunkData = buffer.duplicate(0, (int) chunkLength);
+        chunkManager
+            .writeChunk(container, blockID, chunkInfo, chunkData, WRITE_STAGE);
+        chunkManager
+            .writeChunk(container, blockID, chunkInfo, chunkData, COMMIT_STAGE);
+      }
+    } catch (IOException ex) {
+      LOG.warn("Putting chunks in blocks was not successful for BlockID: "
+          + blockID);
+    }
+  }
+
+  private void updateMetaData(KeyValueContainerData data,
+      KeyValueContainer container, int numOfBlocksPerContainer,
+      int numOfChunksPerBlock) {
+    long chunkLength = 100;
+    try (ReferenceCountedDB metadata = BlockUtils.getDB(data, conf)) {
+      container.getContainerData().setKeyCount(numOfBlocksPerContainer);
+      // Set block count, bytes used and pending delete block count.
+      metadata.getStore().getMetadataTable()
+          .put(OzoneConsts.BLOCK_COUNT, (long) numOfBlocksPerContainer);
+      metadata.getStore().getMetadataTable()
+          .put(OzoneConsts.CONTAINER_BYTES_USED,
+              chunkLength * numOfChunksPerBlock * numOfBlocksPerContainer);
+      metadata.getStore().getMetadataTable()
+          .put(OzoneConsts.PENDING_DELETE_BLOCK_COUNT,
+              (long) numOfBlocksPerContainer);
+    } catch (IOException exception) {
+      LOG.warn("Meta Data update was not successful for container: "+container);
     }
   }
 
@@ -231,11 +374,32 @@ public class TestBlockDeletingService {
    * Get under deletion blocks count from DB,
    * note this info is parsed from container.db.
    */
-  private int getUnderDeletionBlocksCount(ReferenceCountedDB meta)
-      throws IOException {
-    return meta.getStore().getBlockDataTable()
-        .getRangeKVs(null, 100,
-        MetadataKeyFilters.getDeletingKeyFilter()).size();
+  private int getUnderDeletionBlocksCount(ReferenceCountedDB meta,
+      KeyValueContainerData data) throws IOException {
+    if (data.getSchemaVersion().equals(SCHEMA_V1)) {
+      return meta.getStore().getBlockDataTable()
+          .getRangeKVs(null, 100, MetadataKeyFilters.getDeletingKeyFilter())
+          .size();
+    } else if (data.getSchemaVersion().equals(SCHEMA_V2)) {
+      int pendingBlocks = 0;
+      DatanodeStore ds = meta.getStore();
+      DatanodeStoreSchemaTwoImpl dnStoreTwoImpl =
+          (DatanodeStoreSchemaTwoImpl) ds;
+      try (
+          TableIterator<Long, ? extends Table.KeyValue<Long, 
+              StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction>> 
+              iter = dnStoreTwoImpl.getDeleteTransactionTable().iterator()) {
+        while (iter.hasNext()) {
+          StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction
+              delTx = iter.next().getValue();
+          pendingBlocks += delTx.getLocalIDList().size();
+        }
+      }
+      return pendingBlocks;
+    } else {
+      throw new UnsupportedOperationException(
+          "Only schema version 1 and schema version 2 are supported.");
+    }
   }
 
 
@@ -261,6 +425,7 @@ public class TestBlockDeletingService {
     // Ensure 1 container was created
     List<ContainerData> containerData = Lists.newArrayList();
     containerSet.listContainer(0L, 1, containerData);
+    KeyValueContainerData data = (KeyValueContainerData) containerData.get(0);
     Assert.assertEquals(1, containerData.size());
 
     try(ReferenceCountedDB meta = BlockUtils.getDB(
@@ -280,7 +445,7 @@ public class TestBlockDeletingService {
       Assert.assertEquals(0, transactionId);
 
       // Ensure there are 3 blocks under deletion and 0 deleted blocks
-      Assert.assertEquals(3, getUnderDeletionBlocksCount(meta));
+      Assert.assertEquals(3, getUnderDeletionBlocksCount(meta, data));
       Assert.assertEquals(3, meta.getStore().getMetadataTable()
           .get(OzoneConsts.PENDING_DELETE_BLOCK_COUNT).longValue());
 
@@ -348,6 +513,9 @@ public class TestBlockDeletingService {
   public void testBlockDeletionTimeout() throws Exception {
     conf.setInt(OZONE_BLOCK_DELETING_CONTAINER_LIMIT_PER_INTERVAL, 10);
     conf.setInt(OZONE_BLOCK_DELETING_LIMIT_PER_CONTAINER, 2);
+    this.blockLimitPerTask =
+        conf.getInt(OZONE_BLOCK_DELETING_LIMIT_PER_CONTAINER,
+            OZONE_BLOCK_DELETING_LIMIT_PER_CONTAINER_DEFAULT);
     ContainerSet containerSet = new ContainerSet();
     createToDeleteBlocks(containerSet, 1, 3, 1);
     ContainerMetrics metrics = ContainerMetrics.create(conf);
@@ -394,7 +562,7 @@ public class TestBlockDeletingService {
       LogCapturer newLog = LogCapturer.captureLogs(BackgroundService.LOG);
       GenericTestUtils.waitFor(() -> {
         try {
-          return getUnderDeletionBlocksCount(meta) == 0;
+          return getUnderDeletionBlocksCount(meta, data) == 0;
         } catch (IOException ignored) {
         }
         return false;
@@ -445,6 +613,9 @@ public class TestBlockDeletingService {
         TopNOrderedContainerDeletionChoosingPolicy.class.getName());
     conf.setInt(OZONE_BLOCK_DELETING_CONTAINER_LIMIT_PER_INTERVAL, 1);
     conf.setInt(OZONE_BLOCK_DELETING_LIMIT_PER_CONTAINER, 1);
+    this.blockLimitPerTask =
+        conf.getInt(OZONE_BLOCK_DELETING_LIMIT_PER_CONTAINER,
+            OZONE_BLOCK_DELETING_LIMIT_PER_CONTAINER_DEFAULT);
     ContainerSet containerSet = new ContainerSet();
 
     int containerCount = 2;
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManager.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManager.java
index 548f073..7bccd8e 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManager.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManager.java
@@ -92,11 +92,15 @@ import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 
 import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY;
-import static org.apache.hadoop.hdds.HddsConfigKeys.*;
+import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_CONTAINER_REPORT_INTERVAL;
+import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_COMMAND_STATUS_REPORT_INTERVAL;
+import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_SCM_SAFEMODE_PIPELINE_CREATION;
 import static org.junit.Assert.fail;
 import static org.mockito.Matchers.argThat;
 import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.*;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
 /**
  * Test class that exercises the StorageContainerManager.
@@ -272,8 +276,6 @@ public class TestStorageContainerManager {
 
       Map<Long, List<Long>> containerBlocks = createDeleteTXLog(delLog,
           keyLocations, helper);
-      Set<Long> containerIDs = containerBlocks.keySet();
-
       // Verify a few TX gets created in the TX log.
       Assert.assertTrue(delLog.getNumOfValidTransactions() > 0);
 
@@ -289,8 +291,7 @@ public class TestStorageContainerManager {
           return false;
         }
       }, 1000, 10000);
-      Assert.assertTrue(helper.getAllBlocks(containerIDs).isEmpty());
-
+      Assert.assertTrue(helper.verifyBlocksWithTxnTable(containerBlocks));
       // Continue the work, add some TXs that with known container names,
       // but unknown block IDs.
       for (Long containerID : containerBlocks.keySet()) {
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManagerHelper.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManagerHelper.java
index c67fe30..fe0e075 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManagerHelper.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManagerHelper.java
@@ -17,6 +17,7 @@
 package org.apache.hadoop.ozone;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -32,9 +33,14 @@ import org.apache.hadoop.ozone.container.common.helpers.BlockData;
 import org.apache.hadoop.ozone.container.common.utils.ReferenceCountedDB;
 import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
 import org.apache.hadoop.ozone.container.keyvalue.helpers.BlockUtils;
+import org.apache.hadoop.ozone.container.metadata.DatanodeStore;
+import org.apache.hadoop.ozone.container.metadata.DatanodeStoreSchemaTwoImpl;
 import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
 import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
 import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction;
+
 
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
@@ -137,6 +143,30 @@ public class TestStorageContainerManagerHelper {
     return allBlocks;
   }
 
+  public boolean verifyBlocksWithTxnTable(Map<Long, List<Long>> containerBlocks)
+      throws IOException {
+    Set<Long> containerIDs = containerBlocks.keySet();
+    for (Long entry : containerIDs) {
+      ReferenceCountedDB meta = getContainerMetadata(entry);
+      DatanodeStore ds = meta.getStore();
+      DatanodeStoreSchemaTwoImpl dnStoreTwoImpl =
+          (DatanodeStoreSchemaTwoImpl) ds;
+      List<? extends Table.KeyValue<Long, DeletedBlocksTransaction>>
+          txnsInTxnTable = dnStoreTwoImpl.getDeleteTransactionTable()
+          .getRangeKVs(null, Integer.MAX_VALUE, null);
+      List<Long> conID = new ArrayList<>();
+      for (Table.KeyValue<Long, DeletedBlocksTransaction> txn :
+          txnsInTxnTable) {
+        conID.addAll(txn.getValue().getLocalIDList());
+      }
+      if (!conID.equals(containerBlocks.get(entry))) {
+        return false;
+      }
+      meta.close();
+    }
+    return true;
+  }
+
   private ReferenceCountedDB getContainerMetadata(Long containerID)
       throws IOException {
     ContainerWithPipeline containerWithPipeline = cluster


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org