You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by lj...@apache.org on 2021/01/06 14:47:21 UTC

[ozone] 02/02: HDDS-4369. Datanode should store the delete transaction as is in rocksDB (#1702)

This is an automated email from the ASF dual-hosted git repository.

ljain pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git

commit 431e9097b4d0e7f3d5de04ee86cc8d4ba0c05e30
Author: Aryan Gupta <44...@users.noreply.github.com>
AuthorDate: Wed Jan 6 20:15:44 2021 +0530

    HDDS-4369. Datanode should store the delete transaction as is in rocksDB (#1702)
---
 .../java/org/apache/hadoop/ozone/OzoneConsts.java  |   6 +-
 .../commandhandler/DeleteBlocksCommandHandler.java | 191 ++++++++------
 .../background/BlockDeletingService.java           | 171 +++++++++++--
 .../metadata/AbstractDatanodeDBDefinition.java     |   2 +-
 .../metadata/DatanodeSchemaOneDBDefinition.java    |   5 +
 .../metadata/DatanodeSchemaTwoDBDefinition.java    |  32 ++-
 .../metadata/DatanodeStoreSchemaTwoImpl.java       |  14 +-
 ...mpl.java => DeletedBlocksTransactionCodec.java} |  35 +--
 .../container/common/TestBlockDeletingService.java | 279 +++++++++++++++++----
 .../hadoop/ozone/TestStorageContainerManager.java  |  13 +-
 .../ozone/TestStorageContainerManagerHelper.java   |  30 +++
 11 files changed, 604 insertions(+), 174 deletions(-)

diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
index d2d6e35..9836452 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
@@ -254,11 +254,15 @@ public final class OzoneConsts {
   // versions, requiring this property to be tracked on a per container basis.
   // V1: All data in default column family.
   public static final String SCHEMA_V1 = "1";
-  // V2: Metadata, block data, and deleted blocks in their own column families.
+  // V2: Metadata, block data, and delete transactions in their own
+  // column families.
   public static final String SCHEMA_V2 = "2";
   // Most recent schema version that all new containers should be created with.
   public static final String SCHEMA_LATEST = SCHEMA_V2;
 
+  public static final String[] SCHEMA_VERSIONS =
+      new String[] {SCHEMA_V1, SCHEMA_V2};
+
   // Supported store types.
   public static final String OZONE = "ozone";
   public static final String S3 = "s3";
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteBlocksCommandHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteBlocksCommandHandler.java
index 91ab4c9..10e6797 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteBlocksCommandHandler.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteBlocksCommandHandler.java
@@ -42,6 +42,8 @@ import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
 import org.apache.hadoop.ozone.container.common.statemachine
     .SCMConnectionManager;
 import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
+import org.apache.hadoop.ozone.container.metadata.DatanodeStore;
+import org.apache.hadoop.ozone.container.metadata.DatanodeStoreSchemaTwoImpl;
 import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
 import org.apache.hadoop.ozone.protocol.commands.CommandStatus;
 import org.apache.hadoop.ozone.protocol.commands.DeleteBlockCommandStatus;
@@ -59,6 +61,8 @@ import java.util.function.Consumer;
 
 import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
     .Result.CONTAINER_NOT_FOUND;
+import static org.apache.hadoop.ozone.OzoneConsts.SCHEMA_V1;
+import static org.apache.hadoop.ozone.OzoneConsts.SCHEMA_V2;
 
 /**
  * Handle block deletion commands.
@@ -116,6 +120,7 @@ public class DeleteBlocksCommandHandler implements CommandHandler {
             DeleteBlockTransactionResult.newBuilder();
         txResultBuilder.setTxID(entry.getTxID());
         long containerId = entry.getContainerID();
+        int newDeletionBlocks = 0;
         try {
           Container cont = containerSet.getContainer(containerId);
           if (cont == null) {
@@ -129,7 +134,16 @@ public class DeleteBlocksCommandHandler implements CommandHandler {
                 cont.getContainerData();
             cont.writeLock();
             try {
-              deleteKeyValueContainerBlocks(containerData, entry);
+              if (containerData.getSchemaVersion().equals(SCHEMA_V1)) {
+                markBlocksForDeletionSchemaV1(containerData, entry);
+              } else if (containerData.getSchemaVersion().equals(SCHEMA_V2)) {
+                markBlocksForDeletionSchemaV2(containerData, entry,
+                    newDeletionBlocks, entry.getTxID());
+              } else {
+                throw new UnsupportedOperationException(
+                    "Only schema version 1 and schema version 2 are "
+                        + "supported.");
+              }
             } finally {
               cont.writeUnlock();
             }
@@ -187,107 +201,140 @@ public class DeleteBlocksCommandHandler implements CommandHandler {
    * @param delTX a block deletion transaction.
    * @throws IOException if I/O error occurs.
    */
-  private void deleteKeyValueContainerBlocks(
-      KeyValueContainerData containerData, DeletedBlocksTransaction delTX)
-      throws IOException {
+
+  private void markBlocksForDeletionSchemaV2(
+      KeyValueContainerData containerData, DeletedBlocksTransaction delTX,
+      int newDeletionBlocks, long txnID) throws IOException {
     long containerId = delTX.getContainerID();
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Processing Container : {}, DB path : {}", containerId,
-          containerData.getMetadataPath());
+    if (!isTxnIdValid(containerId, containerData, delTX)) {
+      return;
     }
-
-    if (delTX.getTxID() < containerData.getDeleteTransactionId()) {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug(String.format("Ignoring delete blocks for containerId: %d."
-                + " Outdated delete transactionId %d < %d", containerId,
-            delTX.getTxID(), containerData.getDeleteTransactionId()));
+    try (ReferenceCountedDB containerDB = BlockUtils
+        .getDB(containerData, conf)) {
+      DatanodeStore ds = containerDB.getStore();
+      DatanodeStoreSchemaTwoImpl dnStoreTwoImpl =
+          (DatanodeStoreSchemaTwoImpl) ds;
+      Table<Long, DeletedBlocksTransaction> delTxTable =
+          dnStoreTwoImpl.getDeleteTransactionTable();
+      try (BatchOperation batch = containerDB.getStore().getBatchHandler()
+          .initBatchOperation()) {
+        delTxTable.putWithBatch(batch, txnID, delTX);
+        newDeletionBlocks += delTX.getLocalIDList().size();
+        updateMetaData(containerData, delTX, newDeletionBlocks, containerDB,
+            batch);
+        containerDB.getStore().getBatchHandler().commitBatchOperation(batch);
       }
-      return;
     }
+  }
 
+  private void markBlocksForDeletionSchemaV1(
+      KeyValueContainerData containerData, DeletedBlocksTransaction delTX)
+      throws IOException {
+    long containerId = delTX.getContainerID();
+    if (!isTxnIdValid(containerId, containerData, delTX)) {
+      return;
+    }
     int newDeletionBlocks = 0;
-    try(ReferenceCountedDB containerDB =
-            BlockUtils.getDB(containerData, conf)) {
+    try (ReferenceCountedDB containerDB = BlockUtils
+        .getDB(containerData, conf)) {
       Table<String, BlockData> blockDataTable =
-              containerDB.getStore().getBlockDataTable();
+          containerDB.getStore().getBlockDataTable();
       Table<String, ChunkInfoList> deletedBlocksTable =
-              containerDB.getStore().getDeletedBlocksTable();
+          containerDB.getStore().getDeletedBlocksTable();
 
-      for (Long blkLong : delTX.getLocalIDList()) {
-        String blk = blkLong.toString();
-        BlockData blkInfo = blockDataTable.get(blk);
-        if (blkInfo != null) {
-          String deletingKey = OzoneConsts.DELETING_KEY_PREFIX + blk;
-
-          if (blockDataTable.get(deletingKey) != null
-              || deletedBlocksTable.get(blk) != null) {
-            if (LOG.isDebugEnabled()) {
-              LOG.debug(String.format(
-                  "Ignoring delete for block %s in container %d."
-                      + " Entry already added.", blk, containerId));
+      try (BatchOperation batch = containerDB.getStore().getBatchHandler()
+          .initBatchOperation()) {
+        for (Long blkLong : delTX.getLocalIDList()) {
+          String blk = blkLong.toString();
+          BlockData blkInfo = blockDataTable.get(blk);
+          if (blkInfo != null) {
+            String deletingKey = OzoneConsts.DELETING_KEY_PREFIX + blk;
+            if (blockDataTable.get(deletingKey) != null
+                || deletedBlocksTable.get(blk) != null) {
+              if (LOG.isDebugEnabled()) {
+                LOG.debug(String.format(
+                    "Ignoring delete for block %s in container %d."
+                        + " Entry already added.", blk, containerId));
+              }
+              continue;
             }
-            continue;
-          }
-
-          try(BatchOperation batch = containerDB.getStore()
-              .getBatchHandler().initBatchOperation()) {
             // Found the block in container db,
             // use an atomic update to change its state to deleting.
             blockDataTable.putWithBatch(batch, deletingKey, blkInfo);
             blockDataTable.deleteWithBatch(batch, blk);
-            containerDB.getStore().getBatchHandler()
-                .commitBatchOperation(batch);
             newDeletionBlocks++;
             if (LOG.isDebugEnabled()) {
               LOG.debug("Transited Block {} to DELETING state in container {}",
                   blk, containerId);
             }
-          } catch (IOException e) {
-            // if some blocks failed to delete, we fail this TX,
-            // without sending this ACK to SCM, SCM will resend the TX
-            // with a certain number of retries.
-            throw new IOException(
-                "Failed to delete blocks for TXID = " + delTX.getTxID(), e);
-          }
-        } else {
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("Block {} not found or already under deletion in"
-                + " container {}, skip deleting it.", blk, containerId);
+          } else {
+            if (LOG.isDebugEnabled()) {
+              LOG.debug("Block {} not found or already under deletion in"
+                  + " container {}, skip deleting it.", blk, containerId);
+            }
           }
         }
+        updateMetaData(containerData, delTX, newDeletionBlocks, containerDB,
+            batch);
+        containerDB.getStore().getBatchHandler().commitBatchOperation(batch);
+      } catch (IOException e) {
+        // if some blocks failed to delete, we fail this TX,
+        // without sending this ACK to SCM, SCM will resend the TX
+        // with a certain number of retries.
+        throw new IOException(
+            "Failed to delete blocks for TXID = " + delTX.getTxID(), e);
       }
+    }
+  }
 
-      if (newDeletionBlocks > 0) {
-        // Finally commit the DB counters.
-        try(BatchOperation batchOperation =
-                containerDB.getStore().getBatchHandler().initBatchOperation()) {
-          Table< String, Long > metadataTable = containerDB.getStore()
-              .getMetadataTable();
+  private void updateMetaData(KeyValueContainerData containerData,
+      DeletedBlocksTransaction delTX, int newDeletionBlocks,
+      ReferenceCountedDB containerDB, BatchOperation batchOperation)
+      throws IOException {
+    if (newDeletionBlocks > 0) {
+      // Finally commit the DB counters.
+      Table<String, Long> metadataTable =
+          containerDB.getStore().getMetadataTable();
 
-          // In memory is updated only when existing delete transactionID is
-          // greater.
-          if (delTX.getTxID() > containerData.getDeleteTransactionId()) {
-            // Update in DB pending delete key count and delete transaction ID.
-            metadataTable.putWithBatch(batchOperation,
-                OzoneConsts.DELETE_TRANSACTION_KEY, delTX.getTxID());
-          }
+      // In memory is updated only when existing delete transactionID is
+      // greater.
+      if (delTX.getTxID() > containerData.getDeleteTransactionId()) {
+        // Update in DB pending delete key count and delete transaction ID.
+        metadataTable
+            .putWithBatch(batchOperation, OzoneConsts.DELETE_TRANSACTION_KEY,
+                delTX.getTxID());
+      }
 
-          long pendingDeleteBlocks =
-              containerData.getNumPendingDeletionBlocks() + newDeletionBlocks;
-          metadataTable.putWithBatch(batchOperation,
-              OzoneConsts.PENDING_DELETE_BLOCK_COUNT, pendingDeleteBlocks);
+      long pendingDeleteBlocks =
+          containerData.getNumPendingDeletionBlocks() + newDeletionBlocks;
+      metadataTable
+          .putWithBatch(batchOperation, OzoneConsts.PENDING_DELETE_BLOCK_COUNT,
+              pendingDeleteBlocks);
 
-          containerDB.getStore().getBatchHandler()
-              .commitBatchOperation(batchOperation);
+      // update pending deletion blocks count and delete transaction ID in
+      // in-memory container status
+      containerData.updateDeleteTransactionId(delTX.getTxID());
+      containerData.incrPendingDeletionBlocks(newDeletionBlocks);
+    }
+  }
 
-          // update pending deletion blocks count and delete transaction ID in
-          // in-memory container status
-          containerData.updateDeleteTransactionId(delTX.getTxID());
+  private boolean isTxnIdValid(long containerId,
+      KeyValueContainerData containerData, DeletedBlocksTransaction delTX) {
+    boolean b = true;
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Processing Container : {}, DB path : {}", containerId,
+          containerData.getMetadataPath());
+    }
 
-          containerData.incrPendingDeletionBlocks(newDeletionBlocks);
-        }
+    if (delTX.getTxID() < containerData.getDeleteTransactionId()) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(String.format("Ignoring delete blocks for containerId: %d."
+                + " Outdated delete transactionId %d < %d", containerId,
+            delTX.getTxID(), containerData.getDeleteTransactionId()));
       }
+      b = false;
     }
+    return b;
   }
 
   @Override
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/statemachine/background/BlockDeletingService.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/statemachine/background/BlockDeletingService.java
index b03b7d7..3dab1fa 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/statemachine/background/BlockDeletingService.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/statemachine/background/BlockDeletingService.java
@@ -20,11 +20,12 @@ package org.apache.hadoop.ozone.container.keyvalue.statemachine.background;
 
 import java.io.File;
 import java.io.IOException;
+import java.util.UUID;
 import java.util.LinkedList;
+import java.util.Objects;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
-import java.util.Objects;
-import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
@@ -32,14 +33,15 @@ import org.apache.hadoop.hdds.conf.ConfigurationSource;
 import org.apache.hadoop.hdds.scm.ScmConfigKeys;
 import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
-import org.apache.hadoop.hdds.utils.BackgroundService;
-import org.apache.hadoop.hdds.utils.BackgroundTask;
-import org.apache.hadoop.hdds.utils.BackgroundTaskQueue;
 import org.apache.hadoop.hdds.utils.BackgroundTaskResult;
 import org.apache.hadoop.hdds.utils.db.BatchOperation;
 import org.apache.hadoop.hdds.utils.MetadataKeyFilters;
+import org.apache.hadoop.hdds.utils.BackgroundTaskQueue;
+import org.apache.hadoop.hdds.utils.BackgroundService;
+import org.apache.hadoop.hdds.utils.BackgroundTask;
 import org.apache.hadoop.hdds.utils.MetadataKeyFilters.KeyPrefixFilter;
 import org.apache.hadoop.hdds.utils.db.Table;
+import org.apache.hadoop.hdds.utils.db.TableIterator;
 import org.apache.hadoop.ozone.container.common.helpers.BlockData;
 import org.apache.hadoop.ozone.container.common.impl.ContainerData;
 import org.apache.hadoop.ozone.container.common.impl.TopNOrderedContainerDeletionChoosingPolicy;
@@ -50,14 +52,22 @@ import org.apache.hadoop.ozone.container.common.transport.server.ratis.XceiverSe
 import org.apache.hadoop.ozone.container.common.utils.ReferenceCountedDB;
 import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
 import org.apache.hadoop.ozone.container.keyvalue.helpers.BlockUtils;
+import org.apache.hadoop.ozone.container.metadata.DatanodeStore;
+import org.apache.hadoop.ozone.container.metadata.DatanodeStoreSchemaTwoImpl;
 import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
 import org.apache.hadoop.util.Time;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction;
 
 import com.google.common.collect.Lists;
+
 import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_CONTAINER_LIMIT_PER_INTERVAL;
 import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_CONTAINER_LIMIT_PER_INTERVAL_DEFAULT;
 import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_LIMIT_PER_CONTAINER;
 import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_LIMIT_PER_CONTAINER_DEFAULT;
+import static org.apache.hadoop.ozone.OzoneConsts.SCHEMA_V1;
+import static org.apache.hadoop.ozone.OzoneConsts.SCHEMA_V2;
+
 import org.apache.ratis.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -66,7 +76,7 @@ import org.slf4j.LoggerFactory;
  * A per-datanode container block deleting service takes in charge
  * of deleting staled ozone blocks.
  */
-// TODO: Fix BlockDeletingService to work with new StorageLayer
+
 public class BlockDeletingService extends BackgroundService {
 
   private static final Logger LOG =
@@ -244,21 +254,54 @@ public class BlockDeletingService extends BackgroundService {
 
     @Override
     public BackgroundTaskResult call() throws Exception {
-      ContainerBackgroundTaskResult crr = new ContainerBackgroundTaskResult();
+      ContainerBackgroundTaskResult crr;
       final Container container = ozoneContainer.getContainerSet()
           .getContainer(containerData.getContainerID());
       container.writeLock();
+      File dataDir = new File(containerData.getChunksPath());
       long startTime = Time.monotonicNow();
       // Scan container's db and get list of under deletion blocks
       try (ReferenceCountedDB meta = BlockUtils.getDB(containerData, conf)) {
+        if (containerData.getSchemaVersion().equals(SCHEMA_V1)) {
+          crr = deleteViaSchema1(meta, container, dataDir, startTime);
+        } else if (containerData.getSchemaVersion().equals(SCHEMA_V2)) {
+          crr = deleteViaSchema2(meta, container, dataDir, startTime);
+        } else {
+          throw new UnsupportedOperationException(
+              "Only schema version 1 and schema version 2 are supported.");
+        }
+        return crr;
+      } finally {
+        container.writeUnlock();
+      }
+    }
+
+    public boolean checkDataDir(File dataDir) {
+      boolean b = true;
+      if (!dataDir.exists() || !dataDir.isDirectory()) {
+        LOG.error("Invalid container data dir {} : "
+            + "does not exist or not a directory", dataDir.getAbsolutePath());
+        b = false;
+      }
+      return b;
+    }
+
+    public ContainerBackgroundTaskResult deleteViaSchema1(
+        ReferenceCountedDB meta, Container container, File dataDir,
+        long startTime) throws IOException {
+      ContainerBackgroundTaskResult crr = new ContainerBackgroundTaskResult();
+      if (!checkDataDir(dataDir)) {
+        return crr;
+      }
+      try {
         Table<String, BlockData> blockDataTable =
-                meta.getStore().getBlockDataTable();
+            meta.getStore().getBlockDataTable();
 
         // # of blocks to delete is throttled
         KeyPrefixFilter filter = MetadataKeyFilters.getDeletingKeyFilter();
         List<? extends Table.KeyValue<String, BlockData>> toDeleteBlocks =
             blockDataTable.getSequentialRangeKVs(null, blockLimitPerTask,
-                    filter);
+                filter);
         if (toDeleteBlocks.isEmpty()) {
           LOG.debug("No under deletion block found in container : {}",
               containerData.getContainerID());
@@ -267,12 +310,6 @@ public class BlockDeletingService extends BackgroundService {
         List<String> succeedBlocks = new LinkedList<>();
         LOG.debug("Container : {}, To-Delete blocks : {}",
             containerData.getContainerID(), toDeleteBlocks.size());
-        File dataDir = new File(containerData.getChunksPath());
-        if (!dataDir.exists() || !dataDir.isDirectory()) {
-          LOG.error("Invalid container data dir {} : "
-              + "does not exist or not a directory", dataDir.getAbsolutePath());
-          return crr;
-        }
 
         Handler handler = Objects.requireNonNull(ozoneContainer.getDispatcher()
             .getHandler(container.getContainerType()));
@@ -292,7 +329,7 @@ public class BlockDeletingService extends BackgroundService {
 
         // Once blocks are deleted... remove the blockID from blockDataTable.
         try(BatchOperation batch = meta.getStore().getBatchHandler()
-                .initBatchOperation()) {
+            .initBatchOperation()) {
           for (String entry : succeedBlocks) {
             blockDataTable.deleteWithBatch(batch, entry);
           }
@@ -312,8 +349,106 @@ public class BlockDeletingService extends BackgroundService {
         }
         crr.addAll(succeedBlocks);
         return crr;
-      } finally {
-        container.writeUnlock();
+      } catch (IOException exception) {
+        LOG.warn(
+            "Deletion operation was not successful for container: " + container
+                .getContainerData().getContainerID(), exception);
+        throw exception;
+      }
+    }
+
+    public ContainerBackgroundTaskResult deleteViaSchema2(
+        ReferenceCountedDB meta, Container container, File dataDir,
+        long startTime) throws IOException {
+      ContainerBackgroundTaskResult crr = new ContainerBackgroundTaskResult();
+      if (!checkDataDir(dataDir)) {
+        return crr;
+      }
+      try {
+        Table<String, BlockData> blockDataTable =
+            meta.getStore().getBlockDataTable();
+        DatanodeStore ds = meta.getStore();
+        DatanodeStoreSchemaTwoImpl dnStoreTwoImpl =
+            (DatanodeStoreSchemaTwoImpl) ds;
+        Table<Long, DeletedBlocksTransaction>
+            deleteTxns = dnStoreTwoImpl.getDeleteTransactionTable();
+        List<DeletedBlocksTransaction> delBlocks = new ArrayList<>();
+        int totalBlocks = 0;
+        try (TableIterator<Long,
+            ? extends Table.KeyValue<Long, DeletedBlocksTransaction>> iter =
+            dnStoreTwoImpl.getDeleteTransactionTable().iterator()) {
+          while (iter.hasNext() && (totalBlocks < blockLimitPerTask)) {
+            DeletedBlocksTransaction delTx = iter.next().getValue();
+            totalBlocks += delTx.getLocalIDList().size();
+            delBlocks.add(delTx);
+          }
+        }
+
+        if (delBlocks.isEmpty()) {
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("No transaction found in container : {}",
+                containerData.getContainerID());
+          }
+          return crr;
+        }
+
+        LOG.debug("Container : {}, To-Delete blocks : {}",
+            containerData.getContainerID(), delBlocks.size());
+
+        Handler handler = Objects.requireNonNull(ozoneContainer.getDispatcher()
+            .getHandler(container.getContainerType()));
+
+        deleteTransactions(delBlocks, handler, blockDataTable, container);
+
+        // Once blocks are deleted... remove the blockID from blockDataTable
+        // and also remove the transactions from txnTable.
+        try(BatchOperation batch = meta.getStore().getBatchHandler()
+            .initBatchOperation()) {
+          for (DeletedBlocksTransaction delTx : delBlocks) {
+            deleteTxns.deleteWithBatch(batch, delTx.getTxID());
+            for (Long blk : delTx.getLocalIDList()) {
+              String bID = blk.toString();
+              meta.getStore().getBlockDataTable().deleteWithBatch(batch, bID);
+            }
+          }
+          meta.getStore().getBatchHandler().commitBatchOperation(batch);
+          containerData.updateAndCommitDBCounters(meta, batch,
+              totalBlocks);
+          // update count of pending deletion blocks and block count in
+          // in-memory container status.
+          containerData.decrPendingDeletionBlocks(totalBlocks);
+          containerData.decrKeyCount(totalBlocks);
+        }
+
+        LOG.info("Container: {}, deleted blocks: {}, task elapsed time: {}ms",
+            containerData.getContainerID(), totalBlocks,
+            Time.monotonicNow() - startTime);
+
+        return crr;
+      } catch (IOException exception) {
+        LOG.warn(
+            "Deletion operation was not successful for container: " + container
+                .getContainerData().getContainerID(), exception);
+        throw exception;
+      }
+    }
+
+    private void deleteTransactions(List<DeletedBlocksTransaction> delBlocks,
+        Handler handler, Table<String, BlockData> blockDataTable,
+        Container container) throws IOException {
+      for (DeletedBlocksTransaction entry : delBlocks) {
+        for (Long blkLong : entry.getLocalIDList()) {
+          String blk = blkLong.toString();
+          BlockData blkInfo = blockDataTable.get(blk);
+          LOG.debug("Deleting block {}", blk);
+          try {
+            handler.deleteBlock(container, blkInfo);
+          } catch (InvalidProtocolBufferException e) {
+            LOG.error("Failed to parse block info for block {}", blk, e);
+          } catch (IOException e) {
+            LOG.error("Failed to delete files for block {}", blk, e);
+          }
+        }
       }
     }
 
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/AbstractDatanodeDBDefinition.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/AbstractDatanodeDBDefinition.java
index 8895475..2fb1174 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/AbstractDatanodeDBDefinition.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/AbstractDatanodeDBDefinition.java
@@ -60,7 +60,7 @@ public abstract class AbstractDatanodeDBDefinition implements DBDefinition {
   @Override
   public DBColumnFamilyDefinition[] getColumnFamilies() {
     return new DBColumnFamilyDefinition[] {getBlockDataColumnFamily(),
-            getMetadataColumnFamily(), getDeletedBlocksColumnFamily()};
+        getMetadataColumnFamily(), getDeletedBlocksColumnFamily()};
   }
 
   public abstract DBColumnFamilyDefinition<String, BlockData>
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeSchemaOneDBDefinition.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeSchemaOneDBDefinition.java
index faf399d..7d5e053 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeSchemaOneDBDefinition.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeSchemaOneDBDefinition.java
@@ -88,4 +88,9 @@ public class DatanodeSchemaOneDBDefinition
       getDeletedBlocksColumnFamily() {
     return DELETED_BLOCKS;
   }
+
+  public DBColumnFamilyDefinition[] getColumnFamilies() {
+    return new DBColumnFamilyDefinition[] {getBlockDataColumnFamily(),
+        getMetadataColumnFamily(), getDeletedBlocksColumnFamily() };
+  }
 }
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeSchemaTwoDBDefinition.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeSchemaTwoDBDefinition.java
index 2ac56f2..1fabd13 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeSchemaTwoDBDefinition.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeSchemaTwoDBDefinition.java
@@ -17,16 +17,19 @@
  */
 package org.apache.hadoop.ozone.container.metadata;
 
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos;
 import org.apache.hadoop.hdds.utils.db.DBColumnFamilyDefinition;
 import org.apache.hadoop.hdds.utils.db.LongCodec;
 import org.apache.hadoop.hdds.utils.db.StringCodec;
 import org.apache.hadoop.ozone.container.common.helpers.BlockData;
 import org.apache.hadoop.ozone.container.common.helpers.ChunkInfoList;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction;
 
 /**
  * This class defines the RocksDB structure for datanodes following schema
- * version 2, where the block data, metadata, and deleted block ids are put in
- * their own separate column families.
+ * version 2, where the block data, metadata, and transactions which are to be
+ * deleted are put in their own separate column families.
  */
 public class DatanodeSchemaTwoDBDefinition extends
         AbstractDatanodeDBDefinition {
@@ -34,7 +37,7 @@ public class DatanodeSchemaTwoDBDefinition extends
   public static final DBColumnFamilyDefinition<String, BlockData>
           BLOCK_DATA =
           new DBColumnFamilyDefinition<>(
-                  "block_data",
+                  "blockData",
                   String.class,
                   new StringCodec(),
                   BlockData.class,
@@ -52,17 +55,33 @@ public class DatanodeSchemaTwoDBDefinition extends
   public static final DBColumnFamilyDefinition<String, ChunkInfoList>
           DELETED_BLOCKS =
           new DBColumnFamilyDefinition<>(
-                  "deleted_blocks",
+                  "deletedBlocks",
                   String.class,
                   new StringCodec(),
                   ChunkInfoList.class,
                   new ChunkInfoListCodec());
 
+  public static final DBColumnFamilyDefinition<Long, DeletedBlocksTransaction>
+      DELETE_TRANSACTION =
+      new DBColumnFamilyDefinition<>(
+          "deleteTxns",
+          Long.class,
+          new LongCodec(),
+          StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction.class,
+          new DeletedBlocksTransactionCodec());
+
   protected DatanodeSchemaTwoDBDefinition(String dbPath) {
     super(dbPath);
   }
 
   @Override
+  public DBColumnFamilyDefinition[] getColumnFamilies() {
+    return new DBColumnFamilyDefinition[] {getBlockDataColumnFamily(),
+        getMetadataColumnFamily(), getDeletedBlocksColumnFamily(),
+        getDeleteTransactionsColumnFamily()};
+  }
+
+  @Override
   public DBColumnFamilyDefinition<String, BlockData>
       getBlockDataColumnFamily() {
     return BLOCK_DATA;
@@ -78,4 +97,9 @@ public class DatanodeSchemaTwoDBDefinition extends
       getDeletedBlocksColumnFamily() {
     return DELETED_BLOCKS;
   }
+
+  public DBColumnFamilyDefinition<Long, DeletedBlocksTransaction>
+      getDeleteTransactionsColumnFamily() {
+    return DELETE_TRANSACTION;
+  }
 }
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeStoreSchemaTwoImpl.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeStoreSchemaTwoImpl.java
index df9b8c0..db8fe6b 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeStoreSchemaTwoImpl.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeStoreSchemaTwoImpl.java
@@ -18,6 +18,9 @@
 package org.apache.hadoop.ozone.container.metadata;
 
 import org.apache.hadoop.hdds.conf.ConfigurationSource;
+import org.apache.hadoop.hdds.protocol.proto.
+    StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction;
+import org.apache.hadoop.hdds.utils.db.Table;
 
 import java.io.IOException;
 
@@ -26,10 +29,13 @@ import java.io.IOException;
  * three column families/tables:
  * 1. A block data table.
  * 2. A metadata table.
- * 3. A deleted blocks table.
+ * 3. A Delete Transaction Table.
  */
 public class DatanodeStoreSchemaTwoImpl extends AbstractDatanodeStore {
 
+  private final Table<Long, DeletedBlocksTransaction>
+      deleteTransactionTable;
+
   /**
    * Constructs the datanode store and starts the DB Services.
    *
@@ -41,5 +47,11 @@ public class DatanodeStoreSchemaTwoImpl extends AbstractDatanodeStore {
       throws IOException {
     super(config, containerID, new DatanodeSchemaTwoDBDefinition(dbPath),
         openReadOnly);
+    this.deleteTransactionTable = new DatanodeSchemaTwoDBDefinition(dbPath)
+        .getDeleteTransactionsColumnFamily().getTable(getStore());
+  }
+
+  public Table<Long, DeletedBlocksTransaction> getDeleteTransactionTable() {
+    return deleteTransactionTable;
   }
 }
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeStoreSchemaTwoImpl.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DeletedBlocksTransactionCodec.java
similarity index 54%
copy from hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeStoreSchemaTwoImpl.java
copy to hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DeletedBlocksTransactionCodec.java
index df9b8c0..90c26fe 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeStoreSchemaTwoImpl.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DeletedBlocksTransactionCodec.java
@@ -17,29 +17,30 @@
  */
 package org.apache.hadoop.ozone.container.metadata;
 
-import org.apache.hadoop.hdds.conf.ConfigurationSource;
+import org.apache.hadoop.hdds.utils.db.Codec;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction;
 
 import java.io.IOException;
 
 /**
- * Constructs a datanode store in accordance with schema version 2, which uses
- * three column families/tables:
- * 1. A block data table.
- * 2. A metadata table.
- * 3. A deleted blocks table.
+ * Supports encoding and decoding {@link DeletedBlocksTransaction} objects.
  */
-public class DatanodeStoreSchemaTwoImpl extends AbstractDatanodeStore {
+public class DeletedBlocksTransactionCodec
+    implements Codec<DeletedBlocksTransaction> {
 
-  /**
-   * Constructs the datanode store and starts the DB Services.
-   *
-   * @param config - Ozone Configuration.
-   * @throws IOException - on Failure.
-   */
-  public DatanodeStoreSchemaTwoImpl(ConfigurationSource config,
-      long containerID, String dbPath, boolean openReadOnly)
+  @Override public byte[] toPersistedFormat(
+      DeletedBlocksTransaction deletedBlocksTransaction) {
+    return deletedBlocksTransaction.toByteArray();
+  }
+
+  @Override public DeletedBlocksTransaction fromPersistedFormat(byte[] rawData)
       throws IOException {
-    super(config, containerID, new DatanodeSchemaTwoDBDefinition(dbPath),
-        openReadOnly);
+    return DeletedBlocksTransaction.parseFrom(rawData);
+  }
+
+  @Override public DeletedBlocksTransaction copyObject(
+      DeletedBlocksTransaction deletedBlocksTransaction) {
+    throw new UnsupportedOperationException();
   }
 }
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/TestBlockDeletingService.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/TestBlockDeletingService.java
index 2eb6a39..96d4228 100644
--- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/TestBlockDeletingService.java
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/TestBlockDeletingService.java
@@ -21,9 +21,10 @@ package org.apache.hadoop.ozone.container.common;
 import java.io.File;
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.UUID;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
-import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
@@ -34,9 +35,13 @@ import org.apache.hadoop.hdds.conf.ConfigurationSource;
 import org.apache.hadoop.hdds.conf.MutableConfigurationSource;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos;
 import org.apache.hadoop.hdds.scm.ScmConfigKeys;
 import org.apache.hadoop.hdds.utils.BackgroundService;
 import org.apache.hadoop.hdds.utils.MetadataKeyFilters;
+import org.apache.hadoop.hdds.utils.db.BatchOperation;
+import org.apache.hadoop.hdds.utils.db.Table;
+import org.apache.hadoop.hdds.utils.db.TableIterator;
 import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.ozone.common.Checksum;
 import org.apache.hadoop.ozone.common.ChunkBuffer;
@@ -55,7 +60,6 @@ import org.apache.hadoop.ozone.container.common.utils.ReferenceCountedDB;
 import org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet;
 import org.apache.hadoop.ozone.container.common.volume.RoundRobinVolumeChoosingPolicy;
 import org.apache.hadoop.ozone.container.common.volume.VolumeSet;
-import org.apache.hadoop.ozone.container.keyvalue.ChunkLayoutTestInfo;
 import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainer;
 import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
 import org.apache.hadoop.ozone.container.keyvalue.KeyValueHandler;
@@ -64,11 +68,14 @@ import org.apache.hadoop.ozone.container.keyvalue.impl.FilePerBlockStrategy;
 import org.apache.hadoop.ozone.container.keyvalue.impl.FilePerChunkStrategy;
 import org.apache.hadoop.ozone.container.keyvalue.interfaces.ChunkManager;
 import org.apache.hadoop.ozone.container.keyvalue.statemachine.background.BlockDeletingService;
+import org.apache.hadoop.ozone.container.metadata.DatanodeStore;
+import org.apache.hadoop.ozone.container.metadata.DatanodeStoreSchemaTwoImpl;
 import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
 import org.apache.hadoop.ozone.container.testutils.BlockDeletingServiceTestImpl;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.hadoop.test.GenericTestUtils.LogCapturer;
 
+import static java.util.stream.Collectors.toList;
 import static org.apache.commons.lang3.RandomStringUtils.randomAlphanumeric;
 
 import org.junit.AfterClass;
@@ -82,7 +89,11 @@ import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_CONTA
 import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_LIMIT_PER_CONTAINER;
 import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_LIMIT_PER_CONTAINER_DEFAULT;
 import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_INTERVAL;
+import static org.apache.hadoop.ozone.OzoneConsts.SCHEMA_VERSIONS;
+import static org.apache.hadoop.ozone.OzoneConsts.SCHEMA_V1;
+import static org.apache.hadoop.ozone.OzoneConsts.SCHEMA_V2;
 import static org.apache.hadoop.ozone.container.common.impl.ChunkLayOutVersion.FILE_PER_BLOCK;
+import static org.apache.hadoop.ozone.container.common.states.endpoint.VersionEndpointTask.LOG;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
@@ -101,16 +112,38 @@ public class TestBlockDeletingService {
   private static MutableConfigurationSource conf;
 
   private final ChunkLayOutVersion layout;
+  private final String schemaVersion;
   private int blockLimitPerTask;
   private static VolumeSet volumeSet;
 
-  public TestBlockDeletingService(ChunkLayOutVersion layout) {
-    this.layout = layout;
+  public TestBlockDeletingService(LayoutInfo layoutInfo) {
+    this.layout = layoutInfo.layout;
+    this.schemaVersion = layoutInfo.schemaVersion;
   }
 
   @Parameterized.Parameters
   public static Iterable<Object[]> parameters() {
-    return ChunkLayoutTestInfo.chunkLayoutParameters();
+    return LayoutInfo.layoutList.stream().map(each -> new Object[] {each})
+        .collect(toList());
+  }
+
+  public static class LayoutInfo {
+    private final String schemaVersion;
+    private final ChunkLayOutVersion layout;
+
+    public LayoutInfo(String schemaVersion, ChunkLayOutVersion layout) {
+      this.schemaVersion = schemaVersion;
+      this.layout = layout;
+    }
+
+    private static List<LayoutInfo> layoutList = new ArrayList<>();
+    static {
+      for (ChunkLayOutVersion ch : ChunkLayOutVersion.getAllVersions()) {
+        for (String sch : SCHEMA_VERSIONS) {
+          layoutList.add(new LayoutInfo(sch, ch));
+        }
+      }
+    }
   }
 
   @BeforeClass
@@ -158,6 +191,7 @@ public class TestBlockDeletingService {
     }
     byte[] arr = randomAlphanumeric(1048576).getBytes(UTF_8);
     ChunkBuffer buffer = ChunkBuffer.wrap(ByteBuffer.wrap(arr));
+    int txnID = 0;
     for (int x = 0; x < numOfContainers; x++) {
       long containerID = ContainerTestHelper.getTestContainerID();
       KeyValueContainerData data =
@@ -165,55 +199,164 @@ public class TestBlockDeletingService {
               ContainerTestHelper.CONTAINER_MAX_SIZE,
               UUID.randomUUID().toString(), datanodeUuid);
       data.closeContainer();
+      data.setSchemaVersion(schemaVersion);
       KeyValueContainer container = new KeyValueContainer(data, conf);
       container.create(volumeSet,
           new RoundRobinVolumeChoosingPolicy(), scmId);
       containerSet.addContainer(container);
       data = (KeyValueContainerData) containerSet.getContainer(
           containerID).getContainerData();
-      long chunkLength = 100;
-      try(ReferenceCountedDB metadata = BlockUtils.getDB(data, conf)) {
-        for (int j = 0; j < numOfBlocksPerContainer; j++) {
-          BlockID blockID =
-              ContainerTestHelper.getTestBlockID(containerID);
-          String deleteStateName = OzoneConsts.DELETING_KEY_PREFIX +
-              blockID.getLocalID();
-          BlockData kd = new BlockData(blockID);
-          List<ContainerProtos.ChunkInfo> chunks = Lists.newArrayList();
-          for (int k = 0; k < numOfChunksPerBlock; k++) {
-            final String chunkName = String.format("block.%d.chunk.%d", j, k);
-            final long offset = k * chunkLength;
-            ContainerProtos.ChunkInfo info =
-                ContainerProtos.ChunkInfo.newBuilder()
-                    .setChunkName(chunkName)
-                    .setLen(chunkLength)
-                    .setOffset(offset)
-                    .setChecksumData(Checksum.getNoChecksumDataProto())
-                    .build();
-            chunks.add(info);
-            ChunkInfo chunkInfo = new ChunkInfo(chunkName, offset, chunkLength);
-            ChunkBuffer chunkData = buffer.duplicate(0, (int) chunkLength);
-            chunkManager.writeChunk(container, blockID, chunkInfo, chunkData,
-                WRITE_STAGE);
-            chunkManager.writeChunk(container, blockID, chunkInfo, chunkData,
-                COMMIT_STAGE);
-          }
-          kd.setChunks(chunks);
-          metadata.getStore().getBlockDataTable().put(
-                  deleteStateName, kd);
-          container.getContainerData().incrPendingDeletionBlocks(1);
-        }
-        container.getContainerData().setKeyCount(numOfBlocksPerContainer);
-        // Set block count, bytes used and pending delete block count.
-        metadata.getStore().getMetadataTable().put(
-                OzoneConsts.BLOCK_COUNT, (long)numOfBlocksPerContainer);
-        metadata.getStore().getMetadataTable().put(
-                OzoneConsts.CONTAINER_BYTES_USED,
-            chunkLength * numOfChunksPerBlock * numOfBlocksPerContainer);
-        metadata.getStore().getMetadataTable().put(
-                OzoneConsts.PENDING_DELETE_BLOCK_COUNT,
-                (long)numOfBlocksPerContainer);
+      if (data.getSchemaVersion().equals(SCHEMA_V1)) {
+        createPendingDeleteBlocksSchema1(numOfBlocksPerContainer, data,
+            containerID, numOfChunksPerBlock, buffer, chunkManager, container);
+      } else if (data.getSchemaVersion().equals(SCHEMA_V2)) {
+        createPendingDeleteBlocksSchema2(numOfBlocksPerContainer, txnID,
+            containerID, numOfChunksPerBlock, buffer, chunkManager, container,
+            data);
+      } else {
+        throw new UnsupportedOperationException(
+            "Only schema version 1 and schema version 2 are "
+                + "supported.");
+      }
+    }
+  }
+
+  @SuppressWarnings("checkstyle:parameternumber")
+  private void createPendingDeleteBlocksSchema1(int numOfBlocksPerContainer,
+      KeyValueContainerData data, long containerID, int numOfChunksPerBlock,
+      ChunkBuffer buffer, ChunkManager chunkManager,
+      KeyValueContainer container) {
+    BlockID blockID = null;
+    try (ReferenceCountedDB metadata = BlockUtils.getDB(data, conf)) {
+      for (int j = 0; j < numOfBlocksPerContainer; j++) {
+        blockID = ContainerTestHelper.getTestBlockID(containerID);
+        String deleteStateName =
+            OzoneConsts.DELETING_KEY_PREFIX + blockID.getLocalID();
+        BlockData kd = new BlockData(blockID);
+        List<ContainerProtos.ChunkInfo> chunks = Lists.newArrayList();
+        putChunksInBlock(numOfChunksPerBlock, j, chunks, buffer, chunkManager,
+            container, blockID);
+        kd.setChunks(chunks);
+        metadata.getStore().getBlockDataTable().put(deleteStateName, kd);
+        container.getContainerData().incrPendingDeletionBlocks(1);
+      }
+      updateMetaData(data, container, numOfBlocksPerContainer,
+          numOfChunksPerBlock);
+    } catch (IOException exception) {
+      LOG.info("Exception " + exception);
+      LOG.warn("Failed to put block: " + blockID + " in BlockDataTable.");
+    }
+  }
+
+  @SuppressWarnings("checkstyle:parameternumber")
+  private void createPendingDeleteBlocksSchema2(int numOfBlocksPerContainer,
+      int txnID, long containerID, int numOfChunksPerBlock, ChunkBuffer buffer,
+      ChunkManager chunkManager, KeyValueContainer container,
+      KeyValueContainerData data) {
+    List<Long> containerBlocks = new ArrayList<>();
+    int blockCount = 0;
+    for (int i = 0; i < numOfBlocksPerContainer; i++) {
+      txnID = txnID + 1;
+      BlockID blockID = ContainerTestHelper.getTestBlockID(containerID);
+      BlockData kd = new BlockData(blockID);
+      List<ContainerProtos.ChunkInfo> chunks = Lists.newArrayList();
+      putChunksInBlock(numOfChunksPerBlock, i, chunks, buffer, chunkManager,
+          container, blockID);
+      kd.setChunks(chunks);
+      String bID = null;
+      try (ReferenceCountedDB metadata = BlockUtils.getDB(data, conf)) {
+        bID = blockID.getLocalID() + "";
+        metadata.getStore().getBlockDataTable().put(bID, kd);
+      } catch (IOException exception) {
+        LOG.info("Exception = " + exception);
+        LOG.warn("Failed to put block: " + bID + " in BlockDataTable.");
+      }
+      container.getContainerData().incrPendingDeletionBlocks(1);
+
+      // In below if statements we are checking if a single container
+      // consists of more blocks than 'blockLimitPerTask' then we create
+      // (totalBlocksInContainer / blockLimitPerTask) transactions which
+      // consists of blocks equal to blockLimitPerTask and last transaction
+      // consists of blocks equal to
+      // (totalBlocksInContainer % blockLimitPerTask).
+      containerBlocks.add(blockID.getLocalID());
+      blockCount++;
+      if (blockCount == blockLimitPerTask || i == (numOfBlocksPerContainer
+          - 1)) {
+        createTxn(data, containerBlocks, txnID, containerID);
+        containerBlocks.clear();
+        blockCount = 0;
+      }
+    }
+    updateMetaData(data, container, numOfBlocksPerContainer,
+        numOfChunksPerBlock);
+  }
+
+  private void createTxn(KeyValueContainerData data, List<Long> containerBlocks,
+      int txnID, long containerID) {
+    try (ReferenceCountedDB metadata = BlockUtils.getDB(data, conf)) {
+      StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction dtx =
+          StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction
+              .newBuilder().setTxID(txnID).setContainerID(containerID)
+              .addAllLocalID(containerBlocks).setCount(0).build();
+      try (BatchOperation batch = metadata.getStore().getBatchHandler()
+          .initBatchOperation()) {
+        DatanodeStore ds = metadata.getStore();
+        DatanodeStoreSchemaTwoImpl dnStoreTwoImpl =
+            (DatanodeStoreSchemaTwoImpl) ds;
+        dnStoreTwoImpl.getDeleteTransactionTable()
+            .putWithBatch(batch, (long) txnID, dtx);
+        metadata.getStore().getBatchHandler().commitBatchOperation(batch);
       }
+    } catch (IOException exception) {
+      LOG.warn("Transaction creation was not successful for txnID: " + txnID
+          + " consisting of " + containerBlocks.size() + " blocks.");
+    }
+  }
+
+  private void putChunksInBlock(int numOfChunksPerBlock, int i,
+      List<ContainerProtos.ChunkInfo> chunks, ChunkBuffer buffer,
+      ChunkManager chunkManager, KeyValueContainer container, BlockID blockID) {
+    long chunkLength = 100;
+    try {
+      for (int k = 0; k < numOfChunksPerBlock; k++) {
+        final String chunkName = String.format("block.%d.chunk.%d", i, k);
+        final long offset = k * chunkLength;
+        ContainerProtos.ChunkInfo info =
+            ContainerProtos.ChunkInfo.newBuilder().setChunkName(chunkName)
+                .setLen(chunkLength).setOffset(offset)
+                .setChecksumData(Checksum.getNoChecksumDataProto()).build();
+        chunks.add(info);
+        ChunkInfo chunkInfo = new ChunkInfo(chunkName, offset, chunkLength);
+        ChunkBuffer chunkData = buffer.duplicate(0, (int) chunkLength);
+        chunkManager
+            .writeChunk(container, blockID, chunkInfo, chunkData, WRITE_STAGE);
+        chunkManager
+            .writeChunk(container, blockID, chunkInfo, chunkData, COMMIT_STAGE);
+      }
+    } catch (IOException ex) {
+      LOG.warn("Putting chunks in blocks was not successful for BlockID: "
+          + blockID);
+    }
+  }
+
+  private void updateMetaData(KeyValueContainerData data,
+      KeyValueContainer container, int numOfBlocksPerContainer,
+      int numOfChunksPerBlock) {
+    long chunkLength = 100;
+    try (ReferenceCountedDB metadata = BlockUtils.getDB(data, conf)) {
+      container.getContainerData().setKeyCount(numOfBlocksPerContainer);
+      // Set block count, bytes used and pending delete block count.
+      metadata.getStore().getMetadataTable()
+          .put(OzoneConsts.BLOCK_COUNT, (long) numOfBlocksPerContainer);
+      metadata.getStore().getMetadataTable()
+          .put(OzoneConsts.CONTAINER_BYTES_USED,
+              chunkLength * numOfChunksPerBlock * numOfBlocksPerContainer);
+      metadata.getStore().getMetadataTable()
+          .put(OzoneConsts.PENDING_DELETE_BLOCK_COUNT,
+              (long) numOfBlocksPerContainer);
+    } catch (IOException exception) {
+      LOG.warn("Meta Data update was not successful for container: "+container);
     }
   }
 
@@ -231,11 +374,32 @@ public class TestBlockDeletingService {
    * Get under deletion blocks count from DB,
    * note this info is parsed from container.db.
    */
-  private int getUnderDeletionBlocksCount(ReferenceCountedDB meta)
-      throws IOException {
-    return meta.getStore().getBlockDataTable()
-        .getRangeKVs(null, 100,
-        MetadataKeyFilters.getDeletingKeyFilter()).size();
+  private int getUnderDeletionBlocksCount(ReferenceCountedDB meta,
+      KeyValueContainerData data) throws IOException {
+    if (data.getSchemaVersion().equals(SCHEMA_V1)) {
+      return meta.getStore().getBlockDataTable()
+          .getRangeKVs(null, 100, MetadataKeyFilters.getDeletingKeyFilter())
+          .size();
+    } else if (data.getSchemaVersion().equals(SCHEMA_V2)) {
+      int pendingBlocks = 0;
+      DatanodeStore ds = meta.getStore();
+      DatanodeStoreSchemaTwoImpl dnStoreTwoImpl =
+          (DatanodeStoreSchemaTwoImpl) ds;
+      try (
+          TableIterator<Long, ? extends Table.KeyValue<Long, 
+              StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction>> 
+              iter = dnStoreTwoImpl.getDeleteTransactionTable().iterator()) {
+        while (iter.hasNext()) {
+          StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction
+              delTx = iter.next().getValue();
+          pendingBlocks += delTx.getLocalIDList().size();
+        }
+      }
+      return pendingBlocks;
+    } else {
+      throw new UnsupportedOperationException(
+          "Only schema version 1 and schema version 2 are supported.");
+    }
   }
 
 
@@ -261,6 +425,7 @@ public class TestBlockDeletingService {
     // Ensure 1 container was created
     List<ContainerData> containerData = Lists.newArrayList();
     containerSet.listContainer(0L, 1, containerData);
+    KeyValueContainerData data = (KeyValueContainerData) containerData.get(0);
     Assert.assertEquals(1, containerData.size());
 
     try(ReferenceCountedDB meta = BlockUtils.getDB(
@@ -280,7 +445,7 @@ public class TestBlockDeletingService {
       Assert.assertEquals(0, transactionId);
 
       // Ensure there are 3 blocks under deletion and 0 deleted blocks
-      Assert.assertEquals(3, getUnderDeletionBlocksCount(meta));
+      Assert.assertEquals(3, getUnderDeletionBlocksCount(meta, data));
       Assert.assertEquals(3, meta.getStore().getMetadataTable()
           .get(OzoneConsts.PENDING_DELETE_BLOCK_COUNT).longValue());
 
@@ -348,6 +513,9 @@ public class TestBlockDeletingService {
   public void testBlockDeletionTimeout() throws Exception {
     conf.setInt(OZONE_BLOCK_DELETING_CONTAINER_LIMIT_PER_INTERVAL, 10);
     conf.setInt(OZONE_BLOCK_DELETING_LIMIT_PER_CONTAINER, 2);
+    this.blockLimitPerTask =
+        conf.getInt(OZONE_BLOCK_DELETING_LIMIT_PER_CONTAINER,
+            OZONE_BLOCK_DELETING_LIMIT_PER_CONTAINER_DEFAULT);
     ContainerSet containerSet = new ContainerSet();
     createToDeleteBlocks(containerSet, 1, 3, 1);
     ContainerMetrics metrics = ContainerMetrics.create(conf);
@@ -394,7 +562,7 @@ public class TestBlockDeletingService {
       LogCapturer newLog = LogCapturer.captureLogs(BackgroundService.LOG);
       GenericTestUtils.waitFor(() -> {
         try {
-          return getUnderDeletionBlocksCount(meta) == 0;
+          return getUnderDeletionBlocksCount(meta, data) == 0;
         } catch (IOException ignored) {
         }
         return false;
@@ -445,6 +613,9 @@ public class TestBlockDeletingService {
         TopNOrderedContainerDeletionChoosingPolicy.class.getName());
     conf.setInt(OZONE_BLOCK_DELETING_CONTAINER_LIMIT_PER_INTERVAL, 1);
     conf.setInt(OZONE_BLOCK_DELETING_LIMIT_PER_CONTAINER, 1);
+    this.blockLimitPerTask =
+        conf.getInt(OZONE_BLOCK_DELETING_LIMIT_PER_CONTAINER,
+            OZONE_BLOCK_DELETING_LIMIT_PER_CONTAINER_DEFAULT);
     ContainerSet containerSet = new ContainerSet();
 
     int containerCount = 2;
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManager.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManager.java
index 548f073..7bccd8e 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManager.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManager.java
@@ -92,11 +92,15 @@ import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 
 import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY;
-import static org.apache.hadoop.hdds.HddsConfigKeys.*;
+import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_CONTAINER_REPORT_INTERVAL;
+import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_COMMAND_STATUS_REPORT_INTERVAL;
+import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_SCM_SAFEMODE_PIPELINE_CREATION;
 import static org.junit.Assert.fail;
 import static org.mockito.Matchers.argThat;
 import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.*;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
 /**
  * Test class that exercises the StorageContainerManager.
@@ -272,8 +276,6 @@ public class TestStorageContainerManager {
 
       Map<Long, List<Long>> containerBlocks = createDeleteTXLog(delLog,
           keyLocations, helper);
-      Set<Long> containerIDs = containerBlocks.keySet();
-
       // Verify a few TX gets created in the TX log.
       Assert.assertTrue(delLog.getNumOfValidTransactions() > 0);
 
@@ -289,8 +291,7 @@ public class TestStorageContainerManager {
           return false;
         }
       }, 1000, 10000);
-      Assert.assertTrue(helper.getAllBlocks(containerIDs).isEmpty());
-
+      Assert.assertTrue(helper.verifyBlocksWithTxnTable(containerBlocks));
       // Continue the work, add some TXs that with known container names,
       // but unknown block IDs.
       for (Long containerID : containerBlocks.keySet()) {
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManagerHelper.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManagerHelper.java
index c67fe30..fe0e075 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManagerHelper.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManagerHelper.java
@@ -17,6 +17,7 @@
 package org.apache.hadoop.ozone;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -32,9 +33,14 @@ import org.apache.hadoop.ozone.container.common.helpers.BlockData;
 import org.apache.hadoop.ozone.container.common.utils.ReferenceCountedDB;
 import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
 import org.apache.hadoop.ozone.container.keyvalue.helpers.BlockUtils;
+import org.apache.hadoop.ozone.container.metadata.DatanodeStore;
+import org.apache.hadoop.ozone.container.metadata.DatanodeStoreSchemaTwoImpl;
 import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
 import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
 import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction;
+
 
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
@@ -137,6 +143,30 @@ public class TestStorageContainerManagerHelper {
     return allBlocks;
   }
 
+  public boolean verifyBlocksWithTxnTable(Map<Long, List<Long>> containerBlocks)
+      throws IOException {
+    Set<Long> containerIDs = containerBlocks.keySet();
+    for (Long entry : containerIDs) {
+      ReferenceCountedDB meta = getContainerMetadata(entry);
+      DatanodeStore ds = meta.getStore();
+      DatanodeStoreSchemaTwoImpl dnStoreTwoImpl =
+          (DatanodeStoreSchemaTwoImpl) ds;
+      List<? extends Table.KeyValue<Long, DeletedBlocksTransaction>>
+          txnsInTxnTable = dnStoreTwoImpl.getDeleteTransactionTable()
+          .getRangeKVs(null, Integer.MAX_VALUE, null);
+      List<Long> conID = new ArrayList<>();
+      for (Table.KeyValue<Long, DeletedBlocksTransaction> txn :
+          txnsInTxnTable) {
+        conID.addAll(txn.getValue().getLocalIDList());
+      }
+      if (!conID.equals(containerBlocks.get(entry))) {
+        return false;
+      }
+      meta.close();
+    }
+    return true;
+  }
+
   private ReferenceCountedDB getContainerMetadata(Long containerID)
       throws IOException {
     ContainerWithPipeline containerWithPipeline = cluster


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org