You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by si...@apache.org on 2023/04/13 20:44:18 UTC

[ozone] branch master updated: HDDS-8317. [Snapshot] DirectoryDeletingService should clean up Snapshot's deletedDirTable. (#4543)

This is an automated email from the ASF dual-hosted git repository.

siyao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new dd003040a4 HDDS-8317. [Snapshot] DirectoryDeletingService should clean up Snapshot's deletedDirTable. (#4543)
dd003040a4 is described below

commit dd003040a41def491e8de003ef8539ce40854972
Author: Aswin Shakil Balasubramanian <as...@gmail.com>
AuthorDate: Thu Apr 13 13:44:12 2023 -0700

    HDDS-8317. [Snapshot] DirectoryDeletingService should clean up Snapshot's deletedDirTable. (#4543)
---
 .../ozone/TestDirectoryDeletingServiceWithFSO.java | 134 +++++++++-
 .../src/main/proto/OmClientProtocol.proto          |   2 +
 .../apache/hadoop/ozone/om/OMMetadataManager.java  |   9 +
 .../hadoop/ozone/om/OmMetadataManagerImpl.java     |   8 +-
 .../key/OMDirectoriesPurgeRequestWithFSO.java      |  18 +-
 .../snapshot/OMSnapshotMoveDeletedKeysRequest.java |   4 +-
 .../key/OMDirectoriesPurgeResponseWithFSO.java     |  50 ++--
 .../OMSnapshotMoveDeletedKeysResponse.java         |  22 +-
 .../om/service/AbstractKeyDeletingService.java     | 214 ++++++++++++++-
 .../ozone/om/service/DirectoryDeletingService.java | 292 ++++-----------------
 .../ozone/om/service/SnapshotDeletingService.java  | 171 ++++++++++--
 11 files changed, 631 insertions(+), 293 deletions(-)

diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestDirectoryDeletingServiceWithFSO.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestDirectoryDeletingServiceWithFSO.java
index f407c1444d..4477e78722 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestDirectoryDeletingServiceWithFSO.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestDirectoryDeletingServiceWithFSO.java
@@ -27,11 +27,13 @@ import org.apache.hadoop.fs.contract.ContractTestUtils;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.utils.IOUtils;
 import org.apache.hadoop.hdds.utils.db.Table;
+import org.apache.hadoop.hdds.utils.db.TableIterator;
 import org.apache.hadoop.ozone.MiniOzoneCluster;
 import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.ozone.TestDataUtil;
 import org.apache.hadoop.ozone.client.OzoneBucket;
 import org.apache.hadoop.ozone.client.OzoneClient;
+import org.apache.hadoop.ozone.om.helpers.SnapshotInfo;
 import org.apache.hadoop.ozone.om.service.DirectoryDeletingService;
 import org.apache.hadoop.ozone.om.service.KeyDeletingService;
 import org.apache.hadoop.ozone.om.OMConfigKeys;
@@ -172,7 +174,7 @@ public class TestDirectoryDeletingServiceWithFSO {
     assertEquals(root.getName(),
         dirTable.iterator().next().getValue().getName());
 
-    assertTrue(dirDeletingService.getRunCount() > 1);
+    assertTrue(dirDeletingService.getRunCount().get() > 1);
   }
 
   /**
@@ -221,7 +223,7 @@ public class TestDirectoryDeletingServiceWithFSO {
     assertSubPathsCount(dirDeletingService::getMovedDirsCount, 0);
     assertSubPathsCount(dirDeletingService::getDeletedDirsCount, 0);
 
-    long preRunCount = dirDeletingService.getRunCount();
+    long preRunCount = dirDeletingService.getRunCount().get();
 
     // Delete the appRoot
     fs.delete(appRoot, true);
@@ -238,8 +240,8 @@ public class TestDirectoryDeletingServiceWithFSO {
     assertSubPathsCount(dirDeletingService::getMovedDirsCount, 18);
     assertSubPathsCount(dirDeletingService::getDeletedDirsCount, 19);
 
-    long elapsedRunCount = dirDeletingService.getRunCount() - preRunCount;
-    assertTrue(dirDeletingService.getRunCount() > 1);
+    long elapsedRunCount = dirDeletingService.getRunCount().get() - preRunCount;
+    assertTrue(dirDeletingService.getRunCount().get() > 1);
     // Ensure dir deleting speed, here provide a backup value for safe CI
     assertTrue(elapsedRunCount >= 7);
   }
@@ -290,7 +292,7 @@ public class TestDirectoryDeletingServiceWithFSO {
     assertSubPathsCount(dirDeletingService::getMovedDirsCount, 2);
     assertSubPathsCount(dirDeletingService::getDeletedDirsCount, 5);
 
-    assertTrue(dirDeletingService.getRunCount() > 1);
+    assertTrue(dirDeletingService.getRunCount().get() > 1);
   }
 
   @Test
@@ -374,6 +376,128 @@ public class TestDirectoryDeletingServiceWithFSO {
     assertEquals(prevDeletedKeyCount + 5, currentDeletedKeyCount);
   }
 
+  @Test
+  public void testDirDeletedTableCleanUpForSnapshot() throws Exception {
+    Table<String, OmKeyInfo> deletedDirTable =
+        cluster.getOzoneManager().getMetadataManager().getDeletedDirTable();
+    Table<String, OmKeyInfo> keyTable =
+        cluster.getOzoneManager().getMetadataManager()
+            .getKeyTable(getFSOBucketLayout());
+    Table<String, OmDirectoryInfo> dirTable =
+        cluster.getOzoneManager().getMetadataManager().getDirectoryTable();
+    Table<String, RepeatedOmKeyInfo> deletedKeyTable =
+        cluster.getOzoneManager().getMetadataManager().getDeletedTable();
+    Table<String, SnapshotInfo> snapshotInfoTable =
+        cluster.getOzoneManager().getMetadataManager().getSnapshotInfoTable();
+
+    /*    DirTable                               KeyTable
+    /v/b/snapDir                       /v/b/snapDir/testKey0 - testKey5
+    /v/b/snapDir/appRoot/              /v/b/snapDir/appRoot/parentDir0/childFile
+    /v/b/snapDir/appRoot/parentDir0/   /v/b/snapDir/appRoot/parentDir1/childFile
+    /v/b/snapDir/appRoot/parentDir1/   /v/b/snapDir/appRoot/parentDir2/childFile
+    /v/b/snapDir/appRoot/parentDir2/
+     */
+
+    Path root = new Path("/snapDir");
+    Path appRoot = new Path(root, "appRoot");
+    // Create  parent dir from root.
+    fs.mkdirs(root);
+
+    // Added 5 sub files inside root dir
+    for (int i = 0; i < 5; i++) {
+      Path path = new Path(root, "testKey" + i);
+      try (FSDataOutputStream stream = fs.create(path)) {
+        stream.write(1);
+      }
+    }
+
+    // Add 3 more sub files in different level
+    for (int i = 0; i < 3; i++) {
+      Path parent = new Path(appRoot, "parentDir" + i);
+      Path child = new Path(parent, "childFile");
+      ContractTestUtils.touch(fs, child);
+    }
+
+    KeyDeletingService keyDeletingService =
+        (KeyDeletingService) cluster.getOzoneManager().getKeyManager()
+            .getDeletingService();
+
+    // Before delete
+    assertTableRowCount(deletedDirTable, 0);
+    assertTableRowCount(keyTable, 8);
+    assertTableRowCount(dirTable, 5);
+
+    // Create snapshot
+    client.getObjectStore().createSnapshot(volumeName, bucketName, "snap1");
+    assertTableRowCount(snapshotInfoTable, 1);
+
+    // Case-1) Delete 3 Files directly.
+    for (int i = 0; i < 3; i++) {
+      Path path = new Path(root, "testKey" + i);
+      fs.delete(path, true);
+    }
+
+    DirectoryDeletingService dirDeletingService =
+        (DirectoryDeletingService) cluster.getOzoneManager().getKeyManager()
+            .getDirDeletingService();
+
+    // After delete. 5 more files left out under the root dir
+    assertTableRowCount(keyTable, 5);
+    assertTableRowCount(dirTable, 5);
+
+    // KeyDeletingService and DirectoryDeletingService will not
+    // clean up because the paths are part of a snapshot.
+    assertTableRowCount(deletedDirTable, 0);
+    assertTableRowCount(deletedKeyTable, 3);
+
+    assertSubPathsCount(dirDeletingService::getMovedFilesCount, 0);
+    assertSubPathsCount(dirDeletingService::getMovedDirsCount, 0);
+    assertSubPathsCount(dirDeletingService::getDeletedDirsCount, 0);
+
+    // Case-2) Delete dir
+    fs.delete(root, true);
+
+    // After delete. 5 sub files are still in keyTable.
+    // 4 dirs in dirTable.
+    assertTableRowCount(keyTable, 5);
+    assertTableRowCount(dirTable, 4);
+
+    // KeyDeletingService and DirectoryDeletingService will not
+    // clean up because the paths are part of a snapshot.
+    // As a result on 1 deleted dir and 3 deleted files will
+    // remain in dirTable and keyTable respectively.
+    assertTableRowCount(deletedDirTable, 1);
+    assertTableRowCount(deletedKeyTable, 3);
+
+    assertSubPathsCount(dirDeletingService::getMovedFilesCount, 0);
+    assertSubPathsCount(dirDeletingService::getMovedDirsCount, 0);
+    assertSubPathsCount(dirDeletingService::getDeletedDirsCount, 0);
+
+    // Manual cleanup deletedDirTable for next tests
+    cleanupTables();
+  }
+
+  private void cleanupTables() throws IOException {
+    cluster.getOzoneManager().getMetadataManager()
+        .getDeletedDirTable().iterator().removeFromDB();
+    try (TableIterator<String, ? extends Table.KeyValue<String, OmKeyInfo>>
+             iterator = cluster.getOzoneManager().getMetadataManager()
+        .getFileTable().iterator()) {
+      while (iterator.hasNext()) {
+        iterator.next();
+        iterator.removeFromDB();
+      }
+    }
+    try (TableIterator<String, ? extends Table.KeyValue<String,
+        OmDirectoryInfo>> deletedItr = cluster.getOzoneManager()
+        .getMetadataManager().getDirectoryTable().iterator()) {
+      while (deletedItr.hasNext()) {
+        deletedItr.next();
+        deletedItr.removeFromDB();
+      }
+    }
+  }
+
   static void assertSubPathsCount(LongSupplier pathCount, long expectedCount)
       throws TimeoutException, InterruptedException {
     GenericTestUtils.waitFor(() -> pathCount.getAsLong() >= expectedCount,
diff --git a/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto b/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto
index 5cbc442ddd..a0d5fcbf6c 100644
--- a/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto
+++ b/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto
@@ -1223,6 +1223,7 @@ message PurgePathsResponse {
 
 message PurgeDirectoriesRequest {
   repeated PurgePathRequest deletedPath = 1;
+  optional string snapshotTableKey = 2;
 }
 
 message PurgeDirectoriesResponse {
@@ -1714,6 +1715,7 @@ message SnapshotMoveDeletedKeysRequest {
   repeated SnapshotMoveKeyInfos nextDBKeys = 2;
   repeated SnapshotMoveKeyInfos reclaimKeys = 3;
   repeated hadoop.hdds.KeyValue renamedKeys = 4;
+  repeated string deletedDirsToMove = 5;
 }
 
 message SnapshotMoveKeyInfos {
diff --git a/hadoop-ozone/interface-storage/src/main/java/org/apache/hadoop/ozone/om/OMMetadataManager.java b/hadoop-ozone/interface-storage/src/main/java/org/apache/hadoop/ozone/om/OMMetadataManager.java
index 276403b3d9..7eed84f753 100644
--- a/hadoop-ozone/interface-storage/src/main/java/org/apache/hadoop/ozone/om/OMMetadataManager.java
+++ b/hadoop-ozone/interface-storage/src/main/java/org/apache/hadoop/ozone/om/OMMetadataManager.java
@@ -466,6 +466,15 @@ public interface OMMetadataManager extends DBStoreHAManager {
    */
   String getOzoneDeletePathKey(long objectId, String pathKey);
 
+  /**
+   * Given ozone delete path key return the corresponding
+   * DB path key for directory table.
+   *
+   * @param ozoneDeletePath - ozone delete path
+   * @return DB directory key as String.
+   */
+  String getOzoneDeletePathDirKey(String ozoneDeletePath);
+
   /**
    * Returns DB key name of an open file in OM metadata store. Should be
    * #open# prefix followed by actual leaf node name.
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java
index 0b88bef837..7e7dbc2777 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java
@@ -1544,7 +1544,7 @@ public class OmMetadataManagerImpl implements OMMetadataManager,
   /**
    * Get the latest OmSnapshot for a snapshot path.
    */
-  private OmSnapshot getLatestSnapshot(String volumeName, String bucketName,
+  public OmSnapshot getLatestSnapshot(String volumeName, String bucketName,
                                        OmSnapshotManager snapshotManager)
       throws IOException {
 
@@ -1847,6 +1847,12 @@ public class OmMetadataManagerImpl implements OMMetadataManager,
     return pathKey + OM_KEY_PREFIX + objectId;
   }
 
+  @Override
+  public String getOzoneDeletePathDirKey(String ozoneDeletePath) {
+    return ozoneDeletePath.substring(0,
+        ozoneDeletePath.lastIndexOf(OM_KEY_PREFIX));
+  }
+
   @Override
   public String getOpenFileName(long volumeId, long bucketId,
                                 long parentID, String fileName,
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMDirectoriesPurgeRequestWithFSO.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMDirectoriesPurgeRequestWithFSO.java
index f319a5c9c8..90ea43a53e 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMDirectoriesPurgeRequestWithFSO.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMDirectoriesPurgeRequestWithFSO.java
@@ -25,10 +25,12 @@ import java.util.Map;
 import java.util.Set;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hadoop.ozone.om.OMMetadataManager;
+import org.apache.hadoop.ozone.om.OmSnapshot;
 import org.apache.hadoop.ozone.om.OzoneManager;
 import org.apache.hadoop.ozone.om.helpers.BucketLayout;
 import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
 import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
+import org.apache.hadoop.ozone.om.helpers.SnapshotInfo;
 import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerDoubleBufferHelper;
 import org.apache.hadoop.ozone.om.request.util.OmResponseUtil;
 import org.apache.hadoop.ozone.om.response.OMClientResponse;
@@ -39,6 +41,7 @@ import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRespo
 
 import java.util.List;
 
+import static org.apache.hadoop.ozone.om.OmSnapshotManager.getSnapshotPrefix;
 import static org.apache.hadoop.ozone.om.lock.OzoneManagerLock.Resource.BUCKET_LOCK;
 
 /**
@@ -55,14 +58,27 @@ public class OMDirectoriesPurgeRequestWithFSO extends OMKeyRequest {
       long trxnLogIndex, OzoneManagerDoubleBufferHelper omDoubleBufferHelper) {
     OzoneManagerProtocolProtos.PurgeDirectoriesRequest purgeDirsRequest =
         getOmRequest().getPurgeDirectoriesRequest();
+    String fromSnapshot = purgeDirsRequest.hasSnapshotTableKey() ?
+        purgeDirsRequest.getSnapshotTableKey() : null;
 
     List<OzoneManagerProtocolProtos.PurgePathRequest> purgeRequests =
             purgeDirsRequest.getDeletedPathList();
 
+    OmSnapshot omFromSnapshot = null;
     Set<Pair<String, String>> lockSet = new HashSet<>();
     Map<Pair<String, String>, OmBucketInfo> volBucketInfoMap = new HashMap<>();
     OMMetadataManager omMetadataManager = ozoneManager.getMetadataManager();
     try {
+      if (fromSnapshot != null) {
+        SnapshotInfo snapshotInfo =
+            ozoneManager.getMetadataManager().getSnapshotInfoTable()
+                .get(fromSnapshot);
+        omFromSnapshot = (OmSnapshot) ozoneManager.getOmSnapshotManager()
+            .checkForSnapshot(snapshotInfo.getVolumeName(),
+                snapshotInfo.getBucketName(),
+                getSnapshotPrefix(snapshotInfo.getName()));
+      }
+
       for (OzoneManagerProtocolProtos.PurgePathRequest path : purgeRequests) {
         for (OzoneManagerProtocolProtos.KeyInfo key :
             path.getMarkDeletedSubDirsList()) {
@@ -129,7 +145,7 @@ public class OMDirectoriesPurgeRequestWithFSO extends OMKeyRequest {
         getOmRequest());
     OMClientResponse omClientResponse = new OMDirectoriesPurgeResponseWithFSO(
         omResponse.build(), purgeRequests, ozoneManager.isRatisEnabled(),
-            getBucketLayout(), volBucketInfoMap);
+            getBucketLayout(), volBucketInfoMap, omFromSnapshot);
     addResponseToDoubleBuffer(trxnLogIndex, omClientResponse,
         omDoubleBufferHelper);
 
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/snapshot/OMSnapshotMoveDeletedKeysRequest.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/snapshot/OMSnapshotMoveDeletedKeysRequest.java
index 44a089b2ab..1b0488a326 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/snapshot/OMSnapshotMoveDeletedKeysRequest.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/snapshot/OMSnapshotMoveDeletedKeysRequest.java
@@ -91,6 +91,8 @@ public class OMSnapshotMoveDeletedKeysRequest extends OMClientRequest {
           moveDeletedKeysRequest.getReclaimKeysList();
       List<HddsProtos.KeyValue> renamedKeysList =
           moveDeletedKeysRequest.getRenamedKeysList();
+      List<String> movedDirs =
+          moveDeletedKeysRequest.getDeletedDirsToMoveList();
 
       OmSnapshot omNextSnapshot = null;
 
@@ -103,7 +105,7 @@ public class OMSnapshotMoveDeletedKeysRequest extends OMClientRequest {
 
       omClientResponse = new OMSnapshotMoveDeletedKeysResponse(
           omResponse.build(), omFromSnapshot, omNextSnapshot,
-          nextDBKeysList, reclaimKeysList, renamedKeysList);
+          nextDBKeysList, reclaimKeysList, renamedKeysList, movedDirs);
 
     } catch (IOException ex) {
       omClientResponse = new OMSnapshotMoveDeletedKeysResponse(
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/OMDirectoriesPurgeResponseWithFSO.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/OMDirectoriesPurgeResponseWithFSO.java
index dec84eaef3..eda6c8fb66 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/OMDirectoriesPurgeResponseWithFSO.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/OMDirectoriesPurgeResponseWithFSO.java
@@ -21,8 +21,10 @@ package org.apache.hadoop.ozone.om.response.key;
 import java.util.Map;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hadoop.hdds.utils.db.BatchOperation;
+import org.apache.hadoop.hdds.utils.db.DBStore;
 import org.apache.hadoop.ozone.OmUtils;
 import org.apache.hadoop.ozone.om.OMMetadataManager;
+import org.apache.hadoop.ozone.om.OmSnapshot;
 import org.apache.hadoop.ozone.om.helpers.BucketLayout;
 import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
 import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
@@ -55,73 +57,89 @@ public class OMDirectoriesPurgeResponseWithFSO extends OmKeyResponse {
   private List<OzoneManagerProtocolProtos.PurgePathRequest> paths;
   private boolean isRatisEnabled;
   private Map<Pair<String, String>, OmBucketInfo> volBucketInfoMap;
+  private OmSnapshot fromSnapshot;
 
 
   public OMDirectoriesPurgeResponseWithFSO(@Nonnull OMResponse omResponse,
       @Nonnull List<OzoneManagerProtocolProtos.PurgePathRequest> paths,
       boolean isRatisEnabled, @Nonnull BucketLayout bucketLayout,
-      Map<Pair<String, String>, OmBucketInfo> volBucketInfoMap) {
+      Map<Pair<String, String>, OmBucketInfo> volBucketInfoMap,
+      OmSnapshot fromSnapshot) {
     super(omResponse, bucketLayout);
     this.paths = paths;
     this.isRatisEnabled = isRatisEnabled;
     this.volBucketInfoMap = volBucketInfoMap;
+    this.fromSnapshot = fromSnapshot;
   }
 
   @Override
-  public void addToDBBatch(OMMetadataManager omMetadataManager,
-      BatchOperation batchOperation) throws IOException {
+  public void addToDBBatch(OMMetadataManager metadataManager,
+      BatchOperation batchOp) throws IOException {
+    if (fromSnapshot != null) {
+      DBStore fromSnapshotStore = fromSnapshot.getMetadataManager().getStore();
+      // Init Batch Operation for snapshot db.
+      try (BatchOperation writeBatch = fromSnapshotStore.initBatchOperation()) {
+        processPaths(fromSnapshot.getMetadataManager(), writeBatch);
+        fromSnapshotStore.commitBatchOperation(writeBatch);
+      }
+    } else {
+      processPaths(metadataManager, batchOp);
+    }
+  }
 
+  public void processPaths(OMMetadataManager omMetadataManager,
+      BatchOperation batchOperation) throws IOException {
     for (OzoneManagerProtocolProtos.PurgePathRequest path : paths) {
       final long volumeId = path.getVolumeId();
       final long bucketId = path.getBucketId();
 
       final List<OzoneManagerProtocolProtos.KeyInfo> deletedSubFilesList =
-              path.getDeletedSubFilesList();
+          path.getDeletedSubFilesList();
       final List<OzoneManagerProtocolProtos.KeyInfo> markDeletedSubDirsList =
-              path.getMarkDeletedSubDirsList();
+          path.getMarkDeletedSubDirsList();
 
       // Add all sub-directories to deleted directory table.
       for (OzoneManagerProtocolProtos.KeyInfo key : markDeletedSubDirsList) {
         OmKeyInfo keyInfo = OmKeyInfo.getFromProtobuf(key);
         String ozoneDbKey = omMetadataManager.getOzonePathKey(volumeId,
-                bucketId, keyInfo.getParentObjectID(), keyInfo.getFileName());
+            bucketId, keyInfo.getParentObjectID(), keyInfo.getFileName());
         String ozoneDeleteKey = omMetadataManager.getOzoneDeletePathKey(
             key.getObjectID(), ozoneDbKey);
         omMetadataManager.getDeletedDirTable().putWithBatch(batchOperation,
             ozoneDeleteKey, keyInfo);
 
         omMetadataManager.getDirectoryTable().deleteWithBatch(batchOperation,
-                ozoneDbKey);
+            ozoneDbKey);
 
         if (LOG.isDebugEnabled()) {
           LOG.debug("markDeletedDirList KeyName: {}, DBKey: {}",
-                  keyInfo.getKeyName(), ozoneDbKey);
+              keyInfo.getKeyName(), ozoneDbKey);
         }
       }
 
       for (OzoneManagerProtocolProtos.KeyInfo key : deletedSubFilesList) {
         OmKeyInfo keyInfo = OmKeyInfo.getFromProtobuf(key);
         String ozoneDbKey = omMetadataManager.getOzonePathKey(volumeId,
-                bucketId, keyInfo.getParentObjectID(), keyInfo.getFileName());
+            bucketId, keyInfo.getParentObjectID(), keyInfo.getFileName());
         omMetadataManager.getKeyTable(getBucketLayout())
-                .deleteWithBatch(batchOperation, ozoneDbKey);
+            .deleteWithBatch(batchOperation, ozoneDbKey);
 
         if (LOG.isDebugEnabled()) {
           LOG.info("Move keyName:{} to DeletedTable DBKey: {}",
-                  keyInfo.getKeyName(), ozoneDbKey);
+              keyInfo.getKeyName(), ozoneDbKey);
         }
 
         RepeatedOmKeyInfo repeatedOmKeyInfo = OmUtils.prepareKeyForDelete(
-                keyInfo, null, keyInfo.getUpdateID(), isRatisEnabled);
+            keyInfo, null, keyInfo.getUpdateID(), isRatisEnabled);
 
         String deletedKey = omMetadataManager
-                .getOzoneKey(keyInfo.getVolumeName(), keyInfo.getBucketName(),
-                        keyInfo.getKeyName());
+            .getOzoneKey(keyInfo.getVolumeName(), keyInfo.getBucketName(),
+                keyInfo.getKeyName());
 
         // TODO: [SNAPSHOT] Acquire deletedTable write table lock
 
         omMetadataManager.getDeletedTable().putWithBatch(batchOperation,
-                deletedKey, repeatedOmKeyInfo);
+            deletedKey, repeatedOmKeyInfo);
 
         // TODO: [SNAPSHOT] Release deletedTable write table lock
 
@@ -130,7 +148,7 @@ public class OMDirectoriesPurgeResponseWithFSO extends OmKeyResponse {
       // Delete the visited directory from deleted directory table
       if (path.hasDeletedDir()) {
         omMetadataManager.getDeletedDirTable().deleteWithBatch(batchOperation,
-                path.getDeletedDir());
+            path.getDeletedDir());
 
         if (LOG.isDebugEnabled()) {
           LOG.info("Purge Deleted Directory DBKey: {}", path.getDeletedDir());
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/snapshot/OMSnapshotMoveDeletedKeysResponse.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/snapshot/OMSnapshotMoveDeletedKeysResponse.java
index 8cc211b4d0..f09e6dbd13 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/snapshot/OMSnapshotMoveDeletedKeysResponse.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/snapshot/OMSnapshotMoveDeletedKeysResponse.java
@@ -48,18 +48,21 @@ public class OMSnapshotMoveDeletedKeysResponse extends OMClientResponse {
   private List<SnapshotMoveKeyInfos> nextDBKeysList;
   private List<SnapshotMoveKeyInfos> reclaimKeysList;
   private List<HddsProtos.KeyValue> renamedKeysList;
+  private List<String> movedDirs;
 
   public OMSnapshotMoveDeletedKeysResponse(OMResponse omResponse,
        @Nonnull OmSnapshot omFromSnapshot, OmSnapshot omNextSnapshot,
        List<SnapshotMoveKeyInfos> nextDBKeysList,
        List<SnapshotMoveKeyInfos> reclaimKeysList,
-       List<HddsProtos.KeyValue> renamedKeysList) {
+       List<HddsProtos.KeyValue> renamedKeysList,
+       List<String> movedDirs) {
     super(omResponse);
     this.fromSnapshot = omFromSnapshot;
     this.nextSnapshot = omNextSnapshot;
     this.nextDBKeysList = nextDBKeysList;
     this.reclaimKeysList = reclaimKeysList;
     this.renamedKeysList = renamedKeysList;
+    this.movedDirs = movedDirs;
   }
 
   /**
@@ -81,11 +84,13 @@ public class OMSnapshotMoveDeletedKeysResponse extends OMClientResponse {
       try (BatchOperation writeBatch = nextSnapshotStore.initBatchOperation()) {
         processKeys(writeBatch, nextSnapshot.getMetadataManager(),
             nextDBKeysList, true);
+        processDirs(writeBatch, nextSnapshot.getMetadataManager());
         nextSnapshotStore.commitBatchOperation(writeBatch);
       }
     } else {
       // Handle the case where there is no next Snapshot.
       processKeys(batchOperation, omMetadataManager, nextDBKeysList, true);
+      processDirs(batchOperation, omMetadataManager);
     }
 
     // Update From Snapshot Deleted Table.
@@ -98,6 +103,21 @@ public class OMSnapshotMoveDeletedKeysResponse extends OMClientResponse {
     }
   }
 
+  private void processDirs(BatchOperation batchOp,
+                           OMMetadataManager omMetadataManager)
+      throws IOException {
+    for (String movedDirsKey : movedDirs) {
+      OmKeyInfo keyInfo = fromSnapshot.getMetadataManager().getDeletedDirTable()
+          .get(movedDirsKey);
+      // Move deleted dirs to next snapshot or active DB
+      omMetadataManager.getDeletedDirTable().putWithBatch(
+          batchOp, movedDirsKey, keyInfo);
+      // Delete dirs from current snapshot that are moved to next snapshot.
+      fromSnapshot.getMetadataManager().getDeletedDirTable()
+          .deleteWithBatch(batchOp, movedDirsKey);
+    }
+  }
+
   private void processKeys(BatchOperation batchOp,
       OMMetadataManager metadataManager,
       List<SnapshotMoveKeyInfos> keyList,
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/AbstractKeyDeletingService.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/AbstractKeyDeletingService.java
index 6af371086f..45230073b0 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/AbstractKeyDeletingService.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/AbstractKeyDeletingService.java
@@ -24,15 +24,20 @@ import org.apache.hadoop.hdds.utils.BackgroundService;
 import org.apache.hadoop.hdds.utils.db.BatchOperation;
 import org.apache.hadoop.hdds.utils.db.DBStore;
 import org.apache.hadoop.hdds.utils.db.Table;
+import org.apache.hadoop.ozone.ClientVersion;
 import org.apache.hadoop.ozone.common.BlockGroup;
 import org.apache.hadoop.ozone.common.DeleteBlockGroupResult;
 import org.apache.hadoop.ozone.om.KeyManager;
+import org.apache.hadoop.ozone.om.OMMetadataManager;
 import org.apache.hadoop.ozone.om.OzoneManager;
 import org.apache.hadoop.ozone.om.helpers.OMRatisHelper;
+import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
 import org.apache.hadoop.ozone.om.helpers.RepeatedOmKeyInfo;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.DeletedKeys;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.PurgeKeysRequest;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.PurgePathRequest;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type;
 import org.apache.hadoop.util.Time;
 import org.apache.ratis.protocol.ClientId;
@@ -51,13 +56,17 @@ import java.util.concurrent.atomic.AtomicLong;
 import static org.apache.hadoop.ozone.OzoneConsts.OM_KEY_PREFIX;
 
 /**
- * Abstract's KeyDeletingService.
+ * Abstracts common code from KeyDeletingService and DirectoryDeletingService
+ * which is now used by SnapshotDeletingService as well.
  */
 public abstract class AbstractKeyDeletingService extends BackgroundService {
 
   private final OzoneManager ozoneManager;
   private final ScmBlockLocationProtocol scmClient;
   private static ClientId clientId = ClientId.randomId();
+  private final AtomicLong deletedDirsCount;
+  private final AtomicLong movedDirsCount;
+  private final AtomicLong movedFilesCount;
   private final AtomicLong runCount;
 
   public AbstractKeyDeletingService(String serviceName, long interval,
@@ -66,6 +75,9 @@ public abstract class AbstractKeyDeletingService extends BackgroundService {
     super(serviceName, interval, unit, threadPoolSize, serviceTimeout);
     this.ozoneManager = ozoneManager;
     this.scmClient = scmClient;
+    this.deletedDirsCount = new AtomicLong(0);
+    this.movedDirsCount = new AtomicLong(0);
+    this.movedFilesCount = new AtomicLong(0);
     this.runCount = new AtomicLong(0);
   }
 
@@ -189,7 +201,7 @@ public abstract class AbstractKeyDeletingService extends BackgroundService {
     return deletedCount;
   }
 
-  private RaftClientRequest createRaftClientRequestForPurge(
+  protected RaftClientRequest createRaftClientRequestForPurge(
       OMRequest omRequest) {
     return RaftClientRequest.newBuilder()
         .setClientId(clientId)
@@ -220,6 +232,173 @@ public abstract class AbstractKeyDeletingService extends BackgroundService {
     map.get(volumeBucketPair).add(objectKey);
   }
 
+  protected void submitPurgePaths(List<PurgePathRequest> requests,
+                                  String snapTableKey) {
+    OzoneManagerProtocolProtos.PurgeDirectoriesRequest.Builder purgeDirRequest =
+        OzoneManagerProtocolProtos.PurgeDirectoriesRequest.newBuilder();
+
+    if (snapTableKey != null) {
+      purgeDirRequest.setSnapshotTableKey(snapTableKey);
+    }
+    purgeDirRequest.addAllDeletedPath(requests);
+
+    OzoneManagerProtocolProtos.OMRequest omRequest =
+        OzoneManagerProtocolProtos.OMRequest.newBuilder()
+            .setCmdType(OzoneManagerProtocolProtos.Type.PurgeDirectories)
+            .setPurgeDirectoriesRequest(purgeDirRequest)
+            .setClientId(clientId.toString())
+            .build();
+
+    // Submit Purge paths request to OM
+    try {
+      RaftClientRequest raftClientRequest =
+          createRaftClientRequestForPurge(omRequest);
+      ozoneManager.getOmRatisServer().submitRequest(omRequest,
+          raftClientRequest);
+    } catch (ServiceException e) {
+      LOG.error("PurgePaths request failed. Will retry at next run.");
+    }
+  }
+
+  private OzoneManagerProtocolProtos.PurgePathRequest wrapPurgeRequest(
+      final long volumeId,
+      final long bucketId,
+      final String purgeDeletedDir,
+      final List<OmKeyInfo> purgeDeletedFiles,
+      final List<OmKeyInfo> markDirsAsDeleted) {
+    // Put all keys to be purged in a list
+    PurgePathRequest.Builder purgePathsRequest = PurgePathRequest.newBuilder();
+    purgePathsRequest.setVolumeId(volumeId);
+    purgePathsRequest.setBucketId(bucketId);
+
+    if (purgeDeletedDir != null) {
+      purgePathsRequest.setDeletedDir(purgeDeletedDir);
+    }
+
+    for (OmKeyInfo purgeFile : purgeDeletedFiles) {
+      purgePathsRequest.addDeletedSubFiles(
+          purgeFile.getProtobuf(true, ClientVersion.CURRENT_VERSION));
+    }
+
+    // Add these directories to deletedDirTable, so that its sub-paths will be
+    // traversed in next iteration to ensure cleanup all sub-children.
+    for (OmKeyInfo dir : markDirsAsDeleted) {
+      purgePathsRequest.addMarkDeletedSubDirs(
+          dir.getProtobuf(ClientVersion.CURRENT_VERSION));
+    }
+
+    return purgePathsRequest.build();
+  }
+
+  protected PurgePathRequest prepareDeleteDirRequest(
+      long remainNum, OmKeyInfo pendingDeletedDirInfo, String delDirName,
+      List<Pair<String, OmKeyInfo>> subDirList,
+      KeyManager keyManager) throws IOException {
+    // step-0: Get one pending deleted directory
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Pending deleted dir name: {}",
+          pendingDeletedDirInfo.getKeyName());
+    }
+
+    final String[] keys = delDirName.split(OM_KEY_PREFIX);
+    final long volumeId = Long.parseLong(keys[1]);
+    final long bucketId = Long.parseLong(keys[2]);
+
+    // step-1: get all sub directories under the deletedDir
+    List<OmKeyInfo> subDirs = keyManager
+        .getPendingDeletionSubDirs(volumeId, bucketId,
+            pendingDeletedDirInfo, remainNum);
+    remainNum = remainNum - subDirs.size();
+
+    OMMetadataManager omMetadataManager = keyManager.getMetadataManager();
+    for (OmKeyInfo dirInfo : subDirs) {
+      String ozoneDbKey = omMetadataManager.getOzonePathKey(volumeId,
+          bucketId, dirInfo.getParentObjectID(), dirInfo.getFileName());
+      String ozoneDeleteKey = omMetadataManager.getOzoneDeletePathKey(
+          dirInfo.getObjectID(), ozoneDbKey);
+      subDirList.add(Pair.of(ozoneDeleteKey, dirInfo));
+      LOG.debug("Moved sub dir name: {}", dirInfo.getKeyName());
+    }
+
+    // step-2: get all sub files under the deletedDir
+    List<OmKeyInfo> subFiles = keyManager
+        .getPendingDeletionSubFiles(volumeId, bucketId,
+            pendingDeletedDirInfo, remainNum);
+    remainNum = remainNum - subFiles.size();
+
+    if (LOG.isDebugEnabled()) {
+      for (OmKeyInfo fileInfo : subFiles) {
+        LOG.debug("Moved sub file name: {}", fileInfo.getKeyName());
+      }
+    }
+
+    // step-3: Since there is a boundary condition of 'numEntries' in
+    // each batch, check whether the sub paths count reached batch size
+    // limit. If count reached limit then there can be some more child
+    // paths to be visited and will keep the parent deleted directory
+    // for one more pass.
+    String purgeDeletedDir = remainNum > 0 ? delDirName : null;
+    return wrapPurgeRequest(volumeId, bucketId,
+        purgeDeletedDir, subFiles, subDirs);
+  }
+
+  @SuppressWarnings("checkstyle:ParameterNumber")
+  public long optimizeDirDeletesAndSubmitRequest(long remainNum,
+      long dirNum, long subDirNum, long subFileNum,
+      List<Pair<String, OmKeyInfo>> allSubDirList,
+      List<PurgePathRequest> purgePathRequestList,
+      String snapTableKey, long startTime) {
+
+    // Optimization to handle delete sub-dir and keys to remove quickly
+    // This case will be useful to handle when depth of directory is high
+    int subdirDelNum = 0;
+    int subDirRecursiveCnt = 0;
+    while (remainNum > 0 && subDirRecursiveCnt < allSubDirList.size()) {
+      try {
+        Pair<String, OmKeyInfo> stringOmKeyInfoPair
+            = allSubDirList.get(subDirRecursiveCnt);
+        PurgePathRequest request = prepareDeleteDirRequest(
+            remainNum, stringOmKeyInfoPair.getValue(),
+            stringOmKeyInfoPair.getKey(), allSubDirList,
+            getOzoneManager().getKeyManager());
+        purgePathRequestList.add(request);
+        remainNum = remainNum - request.getDeletedSubFilesCount();
+        remainNum = remainNum - request.getMarkDeletedSubDirsCount();
+        // Count up the purgeDeletedDir, subDirs and subFiles
+        if (request.getDeletedDir() != null
+            && !request.getDeletedDir().isEmpty()) {
+          subdirDelNum++;
+        }
+        subDirNum += request.getMarkDeletedSubDirsCount();
+        subFileNum += request.getDeletedSubFilesCount();
+        subDirRecursiveCnt++;
+      } catch (IOException e) {
+        LOG.error("Error while running delete directories and files " +
+            "background task. Will retry at next run for subset.", e);
+        break;
+      }
+    }
+
+    // TODO: need to handle delete with non-ratis
+    if (isRatisEnabled()) {
+      submitPurgePaths(purgePathRequestList, snapTableKey);
+    }
+
+    if (dirNum != 0 || subDirNum != 0 || subFileNum != 0) {
+      deletedDirsCount.addAndGet(dirNum + subdirDelNum);
+      movedDirsCount.addAndGet(subDirNum - subdirDelNum);
+      movedFilesCount.addAndGet(subFileNum);
+      LOG.info("Number of dirs deleted: {}, Number of sub-dir " +
+              "deleted: {}, Number of sub-files moved:" +
+              " {} to DeletedTable, Number of sub-dirs moved {} to " +
+              "DeletedDirectoryTable, iteration elapsed: {}ms," +
+              " totalRunCount: {}",
+          dirNum, subdirDelNum, subFileNum, (subDirNum - subdirDelNum),
+          Time.monotonicNow() - startTime, getRunCount());
+    }
+    return remainNum;
+  }
+
   public boolean isRatisEnabled() {
     if (ozoneManager == null) {
       return false;
@@ -244,4 +423,35 @@ public abstract class AbstractKeyDeletingService extends BackgroundService {
   public AtomicLong getRunCount() {
     return runCount;
   }
+
+  /**
+   * Returns the number of dirs deleted by the background service.
+   *
+   * @return Long count.
+   */
+  @VisibleForTesting
+  public long getDeletedDirsCount() {
+    return deletedDirsCount.get();
+  }
+
+  /**
+   * Returns the number of sub-dirs deleted by the background service.
+   *
+   * @return Long count.
+   */
+  @VisibleForTesting
+  public long getMovedDirsCount() {
+    return movedDirsCount.get();
+  }
+
+  /**
+   * Returns the number of files moved to DeletedTable by the background
+   * service.
+   *
+   * @return Long count.
+   */
+  @VisibleForTesting
+  public long getMovedFilesCount() {
+    return movedFilesCount.get();
+  }
 }
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/DirectoryDeletingService.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/DirectoryDeletingService.java
index 3a633f2551..ceed9ebe9a 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/DirectoryDeletingService.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/DirectoryDeletingService.java
@@ -16,28 +16,23 @@
  */
 package org.apache.hadoop.ozone.om.service;
 
-import com.google.common.annotations.VisibleForTesting;
-import com.google.protobuf.ServiceException;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
-import org.apache.hadoop.hdds.utils.BackgroundService;
 import org.apache.hadoop.hdds.utils.BackgroundTask;
 import org.apache.hadoop.hdds.utils.BackgroundTaskQueue;
 import org.apache.hadoop.hdds.utils.BackgroundTaskResult;
 import org.apache.hadoop.hdds.utils.db.Table;
 import org.apache.hadoop.hdds.utils.db.Table.KeyValue;
 import org.apache.hadoop.hdds.utils.db.TableIterator;
-import org.apache.hadoop.ozone.ClientVersion;
-import org.apache.hadoop.ozone.om.OMMetadataManager;
+import org.apache.hadoop.ozone.om.OmMetadataManagerImpl;
+import org.apache.hadoop.ozone.om.OmSnapshot;
+import org.apache.hadoop.ozone.om.OmSnapshotManager;
 import org.apache.hadoop.ozone.om.OzoneManager;
-import org.apache.hadoop.ozone.om.helpers.OMRatisHelper;
+import org.apache.hadoop.ozone.om.helpers.OmDirectoryInfo;
 import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
-import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.PurgePathRequest;
 import org.apache.hadoop.util.Time;
 import org.apache.ratis.protocol.ClientId;
-import org.apache.ratis.protocol.Message;
-import org.apache.ratis.protocol.RaftClientRequest;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -45,9 +40,7 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
 
-import static org.apache.hadoop.ozone.OzoneConsts.OM_KEY_PREFIX;
 import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_PATH_DELETING_LIMIT_PER_TASK;
 import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_PATH_DELETING_LIMIT_PER_TASK_DEFAULT;
 
@@ -68,16 +61,10 @@ import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_PATH_DELETING_LIMIT_
  * deleted by this service. It will continue traversing until all the leaf path
  * components of an orphan directory is visited.
  */
-public class DirectoryDeletingService extends BackgroundService {
+public class DirectoryDeletingService extends AbstractKeyDeletingService {
   public static final Logger LOG =
       LoggerFactory.getLogger(DirectoryDeletingService.class);
 
-  private final OzoneManager ozoneManager;
-  private final AtomicLong deletedDirsCount;
-  private final AtomicLong movedDirsCount;
-  private final AtomicLong movedFilesCount;
-  private final AtomicLong runCount;
-
   private static ClientId clientId = ClientId.randomId();
 
   // Use only a single thread for DirDeletion. Multiple threads would read
@@ -91,31 +78,19 @@ public class DirectoryDeletingService extends BackgroundService {
   public DirectoryDeletingService(long interval, TimeUnit unit,
       long serviceTimeout, OzoneManager ozoneManager,
       OzoneConfiguration configuration) {
-    super("DirectoryDeletingService", interval, unit,
-        DIR_DELETING_CORE_POOL_SIZE, serviceTimeout);
-    this.ozoneManager = ozoneManager;
-    this.deletedDirsCount = new AtomicLong(0);
-    this.movedDirsCount = new AtomicLong(0);
-    this.movedFilesCount = new AtomicLong(0);
-    this.runCount = new AtomicLong(0);
+    super(KeyDeletingService.class.getSimpleName(), interval, unit,
+        DIR_DELETING_CORE_POOL_SIZE, serviceTimeout, ozoneManager, null);
     this.pathLimitPerTask = configuration
         .getInt(OZONE_PATH_DELETING_LIMIT_PER_TASK,
             OZONE_PATH_DELETING_LIMIT_PER_TASK_DEFAULT);
   }
 
   private boolean shouldRun() {
-    if (ozoneManager == null) {
+    if (getOzoneManager() == null) {
       // OzoneManager can be null for testing
       return true;
     }
-    return ozoneManager.isLeaderReady();
-  }
-
-  private boolean isRatisEnabled() {
-    if (ozoneManager == null) {
-      return false;
-    }
-    return ozoneManager.isRatisEnabled();
+    return getOzoneManager().isLeaderReady();
   }
 
   @Override
@@ -138,27 +113,33 @@ public class DirectoryDeletingService extends BackgroundService {
         if (LOG.isDebugEnabled()) {
           LOG.debug("Running DirectoryDeletingService");
         }
-        runCount.incrementAndGet();
-        int dirNum = 0;
-        int subDirNum = 0;
-        int subFileNum = 0;
+        getRunCount().incrementAndGet();
+        long dirNum = 0L;
+        long subDirNum = 0L;
+        long subFileNum = 0L;
         long remainNum = pathLimitPerTask;
         List<PurgePathRequest> purgePathRequestList = new ArrayList<>();
+        List<Pair<String, OmKeyInfo>> allSubDirList
+            = new ArrayList<>((int) remainNum);
 
         Table.KeyValue<String, OmKeyInfo> pendingDeletedDirInfo;
         try (TableIterator<String, ? extends KeyValue<String, OmKeyInfo>>
-                 deleteTableIterator = ozoneManager.getMetadataManager().
+                 deleteTableIterator = getOzoneManager().getMetadataManager().
             getDeletedDirTable().iterator()) {
 
-          List<Pair<String, OmKeyInfo>> allSubDirList
-              = new ArrayList<>((int) remainNum);
           long startTime = Time.monotonicNow();
           while (remainNum > 0 && deleteTableIterator.hasNext()) {
             pendingDeletedDirInfo = deleteTableIterator.next();
+            // Do not reclaim if the directory is still being referenced by
+            // the previous snapshot.
+            if (previousSnapshotHasDir(pendingDeletedDirInfo)) {
+              continue;
+            }
 
             PurgePathRequest request = prepareDeleteDirRequest(
                 remainNum, pendingDeletedDirInfo.getValue(),
-                pendingDeletedDirInfo.getKey(), allSubDirList);
+                pendingDeletedDirInfo.getKey(), allSubDirList,
+                getOzoneManager().getKeyManager());
             purgePathRequestList.add(request);
             remainNum = remainNum - request.getDeletedSubFilesCount();
             remainNum = remainNum - request.getMarkDeletedSubDirsCount();
@@ -170,53 +151,9 @@ public class DirectoryDeletingService extends BackgroundService {
             subDirNum += request.getMarkDeletedSubDirsCount();
             subFileNum += request.getDeletedSubFilesCount();
           }
-          
-          // Optimization to handle delete sub-dir and keys to remove quickly
-          // This case will be useful to handle when depth of directory is high
-          int subdirDelNum = 0;
-          int subDirRecursiveCnt = 0;
-          while (remainNum > 0 && subDirRecursiveCnt < allSubDirList.size()) {
-            try {
-              Pair<String, OmKeyInfo> stringOmKeyInfoPair
-                  = allSubDirList.get(subDirRecursiveCnt);
-              PurgePathRequest request = prepareDeleteDirRequest(
-                  remainNum, stringOmKeyInfoPair.getValue(),
-                  stringOmKeyInfoPair.getKey(), allSubDirList);
-              purgePathRequestList.add(request);
-              remainNum = remainNum - request.getDeletedSubFilesCount();
-              remainNum = remainNum - request.getMarkDeletedSubDirsCount();
-              // Count up the purgeDeletedDir, subDirs and subFiles
-              if (request.getDeletedDir() != null
-                  && !request.getDeletedDir().isEmpty()) {
-                subdirDelNum++;
-              }
-              subDirNum += request.getMarkDeletedSubDirsCount();
-              subFileNum += request.getDeletedSubFilesCount();
-              subDirRecursiveCnt++;
-            } catch (IOException e) {
-              LOG.error("Error while running delete directories and files " +
-                  "background task. Will retry at next run for subset.", e);
-              break;
-            }
-          }
 
-          // TODO: need to handle delete with non-ratis
-          if (isRatisEnabled()) {
-            submitPurgePaths(purgePathRequestList);
-          }
-
-          if (dirNum != 0 || subDirNum != 0 || subFileNum != 0) {
-            deletedDirsCount.addAndGet(dirNum + subdirDelNum);
-            movedDirsCount.addAndGet(subDirNum - subdirDelNum);
-            movedFilesCount.addAndGet(subFileNum);
-            LOG.info("Number of dirs deleted: {}, Number of sub-dir " +
-                    "deleted: {}, Number of sub-files moved:" +
-                    " {} to DeletedTable, Number of sub-dirs moved {} to " +
-                    "DeletedDirectoryTable, iteration elapsed: {}ms," +
-                    " totalRunCount: {}",
-                dirNum, subdirDelNum, subFileNum, (subDirNum - subdirDelNum),
-                Time.monotonicNow() - startTime, getRunCount());
-          }
+          optimizeDirDeletesAndSubmitRequest(remainNum, dirNum, subDirNum,
+              subFileNum, allSubDirList, purgePathRequestList, null, startTime);
 
         } catch (IOException e) {
           LOG.error("Error while running delete directories and files " +
@@ -227,165 +164,32 @@ public class DirectoryDeletingService extends BackgroundService {
       // place holder by returning empty results of this call back.
       return BackgroundTaskResult.EmptyTaskResult.newResult();
     }
-  }
-  
-  private PurgePathRequest prepareDeleteDirRequest(
-      long remainNum, OmKeyInfo pendingDeletedDirInfo, String delDirName,
-      List<Pair<String, OmKeyInfo>> subDirList) throws IOException {
-    // step-0: Get one pending deleted directory
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Pending deleted dir name: {}",
-          pendingDeletedDirInfo.getKeyName());
-    }
-    
-    final String[] keys = delDirName.split(OM_KEY_PREFIX);
-    final long volumeId = Long.parseLong(keys[1]);
-    final long bucketId = Long.parseLong(keys[2]);
-    
-    // step-1: get all sub directories under the deletedDir
-    List<OmKeyInfo> subDirs = ozoneManager.getKeyManager()
-        .getPendingDeletionSubDirs(volumeId, bucketId,
-            pendingDeletedDirInfo, remainNum);
-    remainNum = remainNum - subDirs.size();
-
-    OMMetadataManager omMetadataManager = ozoneManager.getMetadataManager();
-    for (OmKeyInfo dirInfo : subDirs) {
-      String ozoneDbKey = omMetadataManager.getOzonePathKey(volumeId,
-          bucketId, dirInfo.getParentObjectID(), dirInfo.getFileName());
-      String ozoneDeleteKey = omMetadataManager.getOzoneDeletePathKey(
-          dirInfo.getObjectID(), ozoneDbKey);
-      subDirList.add(Pair.of(ozoneDeleteKey, dirInfo));
-      LOG.debug("Moved sub dir name: {}", dirInfo.getKeyName());
-    }
-
-    // step-2: get all sub files under the deletedDir
-    List<OmKeyInfo> subFiles = ozoneManager.getKeyManager()
-        .getPendingDeletionSubFiles(volumeId, bucketId,
-            pendingDeletedDirInfo, remainNum);
-    remainNum = remainNum - subFiles.size();
 
-    if (LOG.isDebugEnabled()) {
-      for (OmKeyInfo fileInfo : subFiles) {
-        LOG.debug("Moved sub file name: {}", fileInfo.getKeyName());
+    private boolean previousSnapshotHasDir(
+        KeyValue<String, OmKeyInfo> pendingDeletedDirInfo) throws IOException {
+      String key = pendingDeletedDirInfo.getKey();
+      OmKeyInfo deletedDirInfo = pendingDeletedDirInfo.getValue();
+      OmSnapshotManager omSnapshotManager =
+          getOzoneManager().getOmSnapshotManager();
+      OmMetadataManagerImpl metadataManager = (OmMetadataManagerImpl)
+          getOzoneManager().getMetadataManager();
+
+      OmSnapshot latestSnapshot =
+          metadataManager.getLatestSnapshot(deletedDirInfo.getVolumeName(),
+              deletedDirInfo.getBucketName(), omSnapshotManager);
+
+      if (latestSnapshot != null) {
+        Table<String, OmDirectoryInfo> prevDirTable =
+            latestSnapshot.getMetadataManager().getDirectoryTable();
+        // In OMKeyDeleteResponseWithFSO OzonePathKey is converted to
+        // OzoneDeletePathKey. Changing it back to check the previous DirTable.
+        String prevDbKey = metadataManager.getOzoneDeletePathDirKey(key);
+        OmDirectoryInfo prevDirInfo = prevDirTable.get(prevDbKey);
+        return prevDirInfo != null &&
+            prevDirInfo.getObjectID() == deletedDirInfo.getObjectID();
       }
+      return false;
     }
-
-    // step-3: Since there is a boundary condition of 'numEntries' in
-    // each batch, check whether the sub paths count reached batch size
-    // limit. If count reached limit then there can be some more child
-    // paths to be visited and will keep the parent deleted directory
-    // for one more pass.
-    String purgeDeletedDir = remainNum > 0 ? delDirName : null;
-    return wrapPurgeRequest(volumeId, bucketId,
-        purgeDeletedDir, subFiles, subDirs);
-  }
-
-  /**
-   * Returns the number of dirs deleted by the background service.
-   *
-   * @return Long count.
-   */
-  @VisibleForTesting
-  public long getDeletedDirsCount() {
-    return deletedDirsCount.get();
-  }
-
-  /**
-   * Returns the number of sub-dirs deleted by the background service.
-   *
-   * @return Long count.
-   */
-  @VisibleForTesting
-  public long getMovedDirsCount() {
-    return movedDirsCount.get();
-  }
-
-  /**
-   * Returns the number of files moved to DeletedTable by the background
-   * service.
-   *
-   * @return Long count.
-   */
-  @VisibleForTesting
-  public long getMovedFilesCount() {
-    return movedFilesCount.get();
-  }
-
-  /**
-   * Returns the number of times this Background service has run.
-   *
-   * @return Long, run count.
-   */
-  @VisibleForTesting
-  public long getRunCount() {
-    return runCount.get();
-  }
-
-  private void submitPurgePaths(List<PurgePathRequest> requests) {
-    OzoneManagerProtocolProtos.PurgeDirectoriesRequest.Builder purgeDirRequest =
-        OzoneManagerProtocolProtos.PurgeDirectoriesRequest.newBuilder();
-    purgeDirRequest.addAllDeletedPath(requests);
-
-    OzoneManagerProtocolProtos.OMRequest omRequest =
-        OzoneManagerProtocolProtos.OMRequest.newBuilder()
-            .setCmdType(OzoneManagerProtocolProtos.Type.PurgeDirectories)
-            .setPurgeDirectoriesRequest(purgeDirRequest)
-            .setClientId(clientId.toString())
-            .build();
-
-    // Submit Purge paths request to OM
-    try {
-      RaftClientRequest raftClientRequest =
-          createRaftClientRequestForDelete(omRequest);
-      ozoneManager.getOmRatisServer().submitRequest(omRequest,
-          raftClientRequest);
-    } catch (ServiceException e) {
-      LOG.error("PurgePaths request failed. Will retry at next run.");
-    }
-  }
-
-  private PurgePathRequest wrapPurgeRequest(final long volumeId,
-      final long bucketId,
-      final String purgeDeletedDir,
-      final List<OmKeyInfo> purgeDeletedFiles,
-      final List<OmKeyInfo> markDirsAsDeleted) {
-    // Put all keys to be purged in a list
-    PurgePathRequest.Builder purgePathsRequest = PurgePathRequest.newBuilder();
-    purgePathsRequest.setVolumeId(volumeId);
-    purgePathsRequest.setBucketId(bucketId);
-
-    if (purgeDeletedDir != null) {
-      purgePathsRequest.setDeletedDir(purgeDeletedDir);
-    }
-
-    for (OmKeyInfo purgeFile : purgeDeletedFiles) {
-      purgePathsRequest.addDeletedSubFiles(
-          purgeFile.getProtobuf(true, ClientVersion.CURRENT_VERSION));
-    }
-
-    // Add these directories to deletedDirTable, so that its sub-paths will be
-    // traversed in next iteration to ensure cleanup all sub-children.
-    for (OmKeyInfo dir : markDirsAsDeleted) {
-      purgePathsRequest.addMarkDeletedSubDirs(
-          dir.getProtobuf(ClientVersion.CURRENT_VERSION));
-    }
-
-    return purgePathsRequest.build();
-  }
-
-
-  private RaftClientRequest createRaftClientRequestForDelete(
-      OzoneManagerProtocolProtos.OMRequest omRequest) {
-    return RaftClientRequest.newBuilder()
-        .setClientId(clientId)
-        .setServerId(ozoneManager.getOmRatisServer().getRaftPeerId())
-        .setGroupId(ozoneManager.getOmRatisServer().getRaftGroupId())
-        .setCallId(runCount.get())
-        .setMessage(
-            Message.valueOf(
-                OMRatisHelper.convertRequestToByteString(omRequest)))
-        .setType(RaftClientRequest.writeRequestType())
-        .build();
   }
 
 }
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/SnapshotDeletingService.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/SnapshotDeletingService.java
index 446881bfec..40caf08954 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/SnapshotDeletingService.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/SnapshotDeletingService.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.ozone.om.service;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.protobuf.ServiceException;
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol;
@@ -36,18 +37,20 @@ import org.apache.hadoop.ozone.om.OmSnapshot;
 import org.apache.hadoop.ozone.om.OmSnapshotManager;
 import org.apache.hadoop.ozone.om.OzoneManager;
 import org.apache.hadoop.ozone.om.SnapshotChainManager;
-import org.apache.hadoop.ozone.om.exceptions.OMException;
 import org.apache.hadoop.ozone.om.helpers.OMRatisHelper;
 import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
+import org.apache.hadoop.ozone.om.helpers.OmDirectoryInfo;
 import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
 import org.apache.hadoop.ozone.om.helpers.RepeatedOmKeyInfo;
 import org.apache.hadoop.ozone.om.helpers.SnapshotInfo;
 import org.apache.hadoop.ozone.om.ratis.OzoneManagerRatisServer;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.PurgePathRequest;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.SnapshotMoveDeletedKeysRequest;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.SnapshotMoveKeyInfos;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.SnapshotPurgeRequest;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type;
+import org.apache.hadoop.util.Time;
 import org.apache.ratis.protocol.ClientId;
 import org.apache.ratis.protocol.Message;
 import org.apache.ratis.protocol.RaftClientRequest;
@@ -61,13 +64,13 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 
+import static org.apache.hadoop.ozone.OzoneConsts.OM_KEY_PREFIX;
 import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_SNAPSHOT_KEY_DELETING_LIMIT_PER_TASK;
 import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_SNAPSHOT_KEY_DELETING_LIMIT_PER_TASK_DEFAULT;
 import static org.apache.hadoop.ozone.OzoneConsts.OBJECT_ID_RECLAIM_BLOCKS;
 import static org.apache.hadoop.ozone.om.OMConfigKeys.SNAPSHOT_DELETING_LIMIT_PER_TASK;
 import static org.apache.hadoop.ozone.om.OMConfigKeys.SNAPSHOT_DELETING_LIMIT_PER_TASK_DEFAULT;
 import static org.apache.hadoop.ozone.om.OmSnapshotManager.getSnapshotPrefix;
-import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.BUCKET_NOT_FOUND;
 
 /**
  * Background Service to clean-up deleted snapshot and reclaim space.
@@ -150,11 +153,8 @@ public class SnapshotDeletingService extends AbstractKeyDeletingService {
 
           Table<String, RepeatedOmKeyInfo> snapshotDeletedTable =
               omSnapshot.getMetadataManager().getDeletedTable();
-
-          // TODO: [SNAPSHOT] Check if deletedDirTable is empty.
-          if (snapshotDeletedTable.isEmpty()) {
-            continue;
-          }
+          Table<String, OmKeyInfo> snapshotDeletedDirTable =
+              omSnapshot.getMetadataManager().getDeletedDirTable();
 
           Table<String, String> renamedKeyTable =
               omSnapshot.getMetadataManager().getSnapshotRenamedKeyTable();
@@ -168,13 +168,27 @@ public class SnapshotDeletingService extends AbstractKeyDeletingService {
               .getBucketTable().get(dbBucketKey);
 
           if (bucketInfo == null) {
-            throw new OMException("Bucket " + snapInfo.getBucketName() +
-                " is not found", BUCKET_NOT_FOUND);
+            throw new IllegalStateException("Bucket " + "/" +
+                snapInfo.getVolumeName() + "/" + snapInfo.getBucketName() +
+                " is not found. BucketInfo should not be null for snapshotted" +
+                " bucket. The OM is in unexpected state.");
+          }
+
+          String snapshotBucketKey = dbBucketKey + OzoneConsts.OM_KEY_PREFIX;
+          String dbBucketKeyForDir = ozoneManager.getMetadataManager()
+              .getBucketKey(Long.toString(volumeId),
+                  Long.toString(bucketInfo.getObjectID())) + OM_KEY_PREFIX;
+
+          if (checkSnapshotReclaimable(snapshotDeletedTable,
+              snapshotDeletedDirTable, snapshotBucketKey, dbBucketKeyForDir)) {
+            purgeSnapshotKeys.add(snapInfo.getTableKey());
+            continue;
           }
 
           //TODO: [SNAPSHOT] Add lock to deletedTable and Active DB.
           SnapshotInfo previousSnapshot = getPreviousSnapshot(snapInfo);
           Table<String, OmKeyInfo> previousKeyTable = null;
+          Table<String, OmDirectoryInfo> previousDirTable = null;
           OmSnapshot omPreviousSnapshot = null;
 
           // Split RepeatedOmKeyInfo and update current snapshot deletedKeyTable
@@ -187,6 +201,8 @@ public class SnapshotDeletingService extends AbstractKeyDeletingService {
 
             previousKeyTable = omPreviousSnapshot
                 .getMetadataManager().getKeyTable(bucketInfo.getBucketLayout());
+            previousDirTable = omPreviousSnapshot
+                .getMetadataManager().getDirectoryTable();
           }
 
           // Move key to either next non deleted snapshot's deletedTable
@@ -194,18 +210,22 @@ public class SnapshotDeletingService extends AbstractKeyDeletingService {
           List<SnapshotMoveKeyInfos> toReclaimList = new ArrayList<>();
           List<SnapshotMoveKeyInfos> toNextDBList = new ArrayList<>();
           List<HddsProtos.KeyValue> renamedKeysList = new ArrayList<>();
+          List<String> dirsToMove = new ArrayList<>();
+
+          long remainNum = handleDirectoryCleanUp(snapshotDeletedDirTable,
+              previousDirTable, dbBucketKeyForDir, snapInfo, omSnapshot,
+              dirsToMove);
+          int deletionCount = 0;
 
           try (TableIterator<String, ? extends Table.KeyValue<String,
               RepeatedOmKeyInfo>> deletedIterator = snapshotDeletedTable
               .iterator()) {
 
             List<BlockGroup> keysToPurge = new ArrayList<>();
-            String snapshotBucketKey = dbBucketKey + OzoneConsts.OM_KEY_PREFIX;
-            iterator.seek(snapshotBucketKey);
+            deletedIterator.seek(snapshotBucketKey);
 
-            int deletionCount = 0;
             while (deletedIterator.hasNext() &&
-                deletionCount < keyLimitPerSnapshot) {
+                deletionCount < remainNum) {
               Table.KeyValue<String, RepeatedOmKeyInfo>
                   deletedKeyValue = deletedIterator.next();
               String deletedKey = deletedKeyValue.getKey();
@@ -214,8 +234,6 @@ public class SnapshotDeletingService extends AbstractKeyDeletingService {
               if (!deletedKey.startsWith(snapshotBucketKey)) {
                 // If snapshot deletedKeyTable doesn't have any
                 // entry in the snapshot scope it can be reclaimed
-                // TODO: [SNAPSHOT] Check deletedDirTable to be empty.
-                purgeSnapshotKeys.add(snapInfo.getTableKey());
                 break;
               }
 
@@ -257,20 +275,23 @@ public class SnapshotDeletingService extends AbstractKeyDeletingService {
               }
               deletionCount++;
             }
-            // Submit Move request to OM.
-            submitSnapshotMoveDeletedKeys(snapInfo, toReclaimList,
-                toNextDBList, renamedKeysList);
 
             // Delete keys From deletedTable
             processKeyDeletes(keysToPurge, omSnapshot.getKeyManager(),
                 snapInfo.getTableKey());
-            snapshotLimit--;
             successRunCount.incrementAndGet();
           } catch (IOException ex) {
-            LOG.error("Error while running Snapshot Deleting Service", ex);
+            LOG.error("Error while running Snapshot Deleting Service for " +
+                "snapshot " + snapInfo.getTableKey() + " with snapshotId " +
+                snapInfo.getSnapshotID() + ". Processed " + deletionCount +
+                " keys and " + (keyLimitPerSnapshot - remainNum) +
+                " directories and files", ex);
           }
+          snapshotLimit--;
+          // Submit Move request to OM.
+          submitSnapshotMoveDeletedKeys(snapInfo, toReclaimList,
+              toNextDBList, renamedKeysList, dirsToMove);
         }
-
         submitSnapshotPurgeRequest(purgeSnapshotKeys);
       } catch (IOException e) {
         LOG.error("Error while running Snapshot Deleting Service", e);
@@ -279,6 +300,88 @@ public class SnapshotDeletingService extends AbstractKeyDeletingService {
       return BackgroundTaskResult.EmptyTaskResult.newResult();
     }
 
+    private boolean checkSnapshotReclaimable(
+        Table<String, RepeatedOmKeyInfo> snapshotDeletedTable,
+        Table<String, OmKeyInfo> snapshotDeletedDirTable,
+        String snapshotBucketKey, String dbBucketKeyForDir) throws IOException {
+
+      boolean isDirTableCleanedUp = false;
+      boolean isKeyTableCleanedUp  = false;
+      try (TableIterator<String, ? extends Table.KeyValue<String,
+          RepeatedOmKeyInfo>> iterator = snapshotDeletedTable.iterator();) {
+        iterator.seek(snapshotBucketKey);
+        isKeyTableCleanedUp = iterator.hasNext() && iterator.next().getKey()
+            .startsWith(snapshotBucketKey);
+      }
+
+      try (TableIterator<String, ? extends Table.KeyValue<String, OmKeyInfo>>
+               iterator = snapshotDeletedDirTable.iterator()) {
+        iterator.seek(dbBucketKeyForDir);
+        isDirTableCleanedUp = iterator.hasNext() && iterator.next().getKey()
+            .startsWith(dbBucketKeyForDir);
+      }
+
+      return (isDirTableCleanedUp || snapshotDeletedDirTable.isEmpty()) &&
+          (isKeyTableCleanedUp || snapshotDeletedTable.isEmpty());
+    }
+
+    private long handleDirectoryCleanUp(
+        Table<String, OmKeyInfo> snapshotDeletedDirTable,
+        Table<String, OmDirectoryInfo> previousDirTable,
+        String dbBucketKeyForDir,
+        SnapshotInfo snapInfo, OmSnapshot omSnapshot,
+        List<String> dirsToMove) {
+
+      long dirNum = 0L;
+      long subDirNum = 0L;
+      long subFileNum = 0L;
+      long remainNum = keyLimitPerSnapshot;
+      List<PurgePathRequest> purgePathRequestList = new ArrayList<>();
+      List<Pair<String, OmKeyInfo>> allSubDirList
+          = new ArrayList<>(keyLimitPerSnapshot);
+      try (TableIterator<String, ? extends
+          Table.KeyValue<String, OmKeyInfo>> deletedDirIterator =
+               snapshotDeletedDirTable.iterator()) {
+
+        long startTime = Time.monotonicNow();
+        deletedDirIterator.seek(dbBucketKeyForDir);
+
+        while (deletedDirIterator.hasNext()) {
+          Table.KeyValue<String, OmKeyInfo> deletedDir =
+              deletedDirIterator.next();
+
+          if (checkDirReclaimable(deletedDir, previousDirTable)) {
+            // Reclaim here
+            PurgePathRequest request = prepareDeleteDirRequest(
+                remainNum, deletedDir.getValue(), deletedDir.getKey(),
+                allSubDirList, omSnapshot.getKeyManager());
+            purgePathRequestList.add(request);
+            remainNum = remainNum - request.getDeletedSubFilesCount();
+            remainNum = remainNum - request.getMarkDeletedSubDirsCount();
+            // Count up the purgeDeletedDir, subDirs and subFiles
+            if (request.getDeletedDir() != null
+                && !request.getDeletedDir().isEmpty()) {
+              dirNum++;
+            }
+            subDirNum += request.getMarkDeletedSubDirsCount();
+            subFileNum += request.getDeletedSubFilesCount();
+          } else {
+            dirsToMove.add(deletedDir.getKey());
+          }
+        }
+
+        remainNum = optimizeDirDeletesAndSubmitRequest(remainNum, dirNum,
+            subDirNum, subFileNum, allSubDirList, purgePathRequestList,
+            snapInfo.getTableKey(), startTime);
+      } catch (IOException e) {
+        LOG.error("Error while running delete directories and files for " +
+            "snapshot " + snapInfo.getTableKey() + " in snapshot deleting " +
+            "background task. Will retry at next run.", e);
+      }
+
+      return remainNum;
+    }
+
     private void submitSnapshotPurgeRequest(List<String> purgeSnapshotKeys) {
       if (!purgeSnapshotKeys.isEmpty()) {
         SnapshotPurgeRequest snapshotPurgeRequest = SnapshotPurgeRequest
@@ -321,7 +424,8 @@ public class SnapshotDeletingService extends AbstractKeyDeletingService {
     private void submitSnapshotMoveDeletedKeys(SnapshotInfo snapInfo,
         List<SnapshotMoveKeyInfos> toReclaimList,
         List<SnapshotMoveKeyInfos> toNextDBList,
-        List<HddsProtos.KeyValue> renamedKeysList) {
+        List<HddsProtos.KeyValue> renamedKeysList,
+        List<String> dirsToMove) {
 
       SnapshotMoveDeletedKeysRequest.Builder moveDeletedKeysBuilder =
           SnapshotMoveDeletedKeysRequest.newBuilder()
@@ -331,6 +435,7 @@ public class SnapshotDeletingService extends AbstractKeyDeletingService {
           .addAllReclaimKeys(toReclaimList)
           .addAllNextDBKeys(toNextDBList)
           .addAllRenamedKeys(renamedKeysList)
+          .addAllDeletedDirsToMove(dirsToMove)
           .build();
 
       OMRequest omRequest = OMRequest.newBuilder()
@@ -342,6 +447,28 @@ public class SnapshotDeletingService extends AbstractKeyDeletingService {
       submitRequest(omRequest);
     }
 
+    private boolean checkDirReclaimable(
+        Table.KeyValue<String, OmKeyInfo> deletedDir,
+        Table<String, OmDirectoryInfo> previousDirTable) throws IOException {
+
+      if (previousDirTable == null) {
+        return true;
+      }
+
+      String deletedDirDbKey = deletedDir.getKey();
+      OmKeyInfo deletedDirInfo = deletedDir.getValue();
+      // In OMKeyDeleteResponseWithFSO OzonePathKey is converted to
+      // OzoneDeletePathKey. Changing it back to check the previous DirTable.
+      String prevDbKey = ozoneManager.getMetadataManager()
+          .getOzoneDeletePathDirKey(deletedDirDbKey);
+      OmDirectoryInfo prevDirectoryInfo = previousDirTable.get(prevDbKey);
+      if (prevDirectoryInfo == null) {
+        return true;
+      }
+
+      return prevDirectoryInfo.getObjectID() != deletedDirInfo.getObjectID();
+    }
+
     private boolean checkKeyReclaimable(
         Table<String, OmKeyInfo> previousKeyTable,
         Table<String, String> renamedKeyTable,


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org