You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by ra...@apache.org on 2021/04/07 03:25:55 UTC

[ozone] 28/29: HDDS-4495. [FSO]Delete : Implement async cleanup of garbage and orphan sub-dirs/files (#2093)

This is an automated email from the ASF dual-hosted git repository.

rakeshr pushed a commit to branch HDDS-2939
in repository https://gitbox.apache.org/repos/asf/ozone.git

commit 055771e1319bccb4f69e62f9bde7e7fbbbdf66ca
Author: Rakesh Radhakrishnan <ra...@apache.org>
AuthorDate: Tue Apr 6 15:36:18 2021 +0530

    HDDS-4495. [FSO]Delete : Implement async cleanup of garbage and orphan sub-dirs/files (#2093)
---
 .../common/src/main/resources/ozone-default.xml    |  19 ++
 .../main/java/org/apache/hadoop/ozone/OmUtils.java |   1 +
 .../org/apache/hadoop/ozone/om/OMConfigKeys.java   |  13 +
 .../TestDirectoryDeletingServiceWithFSOBucket.java | 318 +++++++++++++++++++++
 .../src/main/proto/OmClientProtocol.proto          |  15 +
 .../apache/hadoop/ozone/om/OMMetadataManager.java  |   8 +
 .../hadoop/ozone/om/DirectoryDeletingService.java  | 286 ++++++++++++++++++
 .../org/apache/hadoop/ozone/om/KeyManager.java     |  56 ++++
 .../org/apache/hadoop/ozone/om/KeyManagerImpl.java | 112 ++++++++
 .../hadoop/ozone/om/OmMetadataManagerImpl.java     |  14 +
 .../hadoop/ozone/om/codec/OMDBDefinition.java      |   9 +-
 .../om/ratis/utils/OzoneManagerRatisUtils.java     |   3 +
 .../ozone/om/request/key/OMKeyDeleteRequestV1.java |   6 +-
 .../om/request/key/OMPathsPurgeRequestV1.java      |  64 +++++
 .../om/response/key/OMKeyDeleteResponseV1.java     |  12 +-
 .../om/response/key/OMPathsPurgeResponseV1.java    | 121 ++++++++
 .../om/response/key/TestOMKeyDeleteResponseV1.java |   4 +-
 17 files changed, 1053 insertions(+), 8 deletions(-)

diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml b/hadoop-hdds/common/src/main/resources/ozone-default.xml
index 89e07de..665ccd9 100644
--- a/hadoop-hdds/common/src/main/resources/ozone-default.xml
+++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml
@@ -2770,4 +2770,23 @@
       existing unit test cases won't be affected. New OM version should be 'V1'
     </description>
   </property>
+  <property>
+    <name>ozone.directory.deleting.service.interval</name>
+    <value>1m</value>
+    <tag>OZONE, PERFORMANCE, OM</tag>
+    <description>Time interval of the directory deleting service. It runs on OM
+      periodically and cleanup orphan directory and its sub-tree. For every
+      orphan directory it deletes the sub-path tree structure(dirs/files). It
+      sends sub-files to KeyDeletingService to deletes its blocks. Unit could
+      be defined with postfix (ns,ms,s,m,h,d)
+    </description>
+  </property>
+  <property>
+    <name>ozone.path.deleting.limit.per.task</name>
+    <value>10000</value>
+    <tag>OZONE, PERFORMANCE, OM</tag>
+    <description>A maximum number of paths(dirs/files) to be deleted by
+      directory deleting service per time interval.
+    </description>
+  </property>
 </configuration>
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/OmUtils.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/OmUtils.java
index c389bec..5fedd83 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/OmUtils.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/OmUtils.java
@@ -289,6 +289,7 @@ public final class OmUtils {
     case PurgeKeys:
     case RecoverTrash:
     case DeleteOpenKeys:
+    case PurgePaths:
       return false;
     default:
       LOG.error("CmdType {} is not categorized as readOnly or not.", cmdType);
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
index 71344f9..6a64818 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
@@ -253,4 +253,17 @@ public final class OMConfigKeys {
   public static final String OZONE_OM_LAYOUT_VERSION_DEFAULT = "V0";
 
   public static final String OZONE_OM_LAYOUT_VERSION_V1 = "V1";
+
+  /**
+   * Configuration properties for Directory Deleting Service.
+   */
+  public static final String OZONE_DIR_DELETING_SERVICE_INTERVAL =
+      "ozone.directory.deleting.service.interval";
+  public static final String OZONE_DIR_DELETING_SERVICE_INTERVAL_DEFAULT
+      = "60s";
+
+  public static final String OZONE_PATH_DELETING_LIMIT_PER_TASK =
+      "ozone.path.deleting.limit.per.task";
+  public static final int OZONE_PATH_DELETING_LIMIT_PER_TASK_DEFAULT = 10000;
+
 }
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestDirectoryDeletingServiceWithFSOBucket.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestDirectoryDeletingServiceWithFSOBucket.java
new file mode 100644
index 0000000..aa2d8e7
--- /dev/null
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestDirectoryDeletingServiceWithFSOBucket.java
@@ -0,0 +1,318 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.ozone;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.contract.ContractTestUtils;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.utils.db.Table;
+import org.apache.hadoop.ozone.MiniOzoneCluster;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.TestDataUtil;
+import org.apache.hadoop.ozone.client.OzoneBucket;
+import org.apache.hadoop.ozone.om.DirectoryDeletingService;
+import org.apache.hadoop.ozone.om.OMConfigKeys;
+import org.apache.hadoop.ozone.om.helpers.OmDirectoryInfo;
+import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
+import org.apache.hadoop.ozone.om.request.TestOMRequestUtils;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.Timeout;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.concurrent.TimeoutException;
+
+import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ACL_ENABLED;
+import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_FS_ITERATE_BATCH_SIZE;
+import static org.junit.Assert.fail;
+
+/**
+ * Directory deletion service test cases.
+ */
+public class TestDirectoryDeletingServiceWithFSOBucket {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(TestDirectoryDeletingServiceWithFSOBucket.class);
+
+  /**
+   * Set a timeout for each test.
+   */
+  @Rule
+  public Timeout timeout = Timeout.seconds(300);
+
+  private static boolean isBucketFSOptimized = true;
+  private static boolean enabledFileSystemPaths = true;
+  private static boolean omRatisEnabled = true;
+
+  private static MiniOzoneCluster cluster;
+  private static FileSystem fs;
+  private static String volumeName;
+  private static String bucketName;
+
+  @BeforeClass
+  public static void init() throws Exception {
+    OzoneConfiguration conf = new OzoneConfiguration();
+    conf.setInt(OMConfigKeys.OZONE_DIR_DELETING_SERVICE_INTERVAL, 3);
+    conf.setInt(OMConfigKeys.OZONE_PATH_DELETING_LIMIT_PER_TASK, 5);
+    conf.setBoolean(OMConfigKeys.OZONE_OM_RATIS_ENABLE_KEY, omRatisEnabled);
+    conf.setBoolean(OZONE_ACL_ENABLED, true);
+    if (isBucketFSOptimized) {
+      TestOMRequestUtils.configureFSOptimizedPaths(conf,
+          enabledFileSystemPaths, OMConfigKeys.OZONE_OM_LAYOUT_VERSION_V1);
+    } else {
+      conf.setBoolean(OMConfigKeys.OZONE_OM_ENABLE_FILESYSTEM_PATHS,
+          enabledFileSystemPaths);
+    }
+    cluster = MiniOzoneCluster.newBuilder(conf)
+        .setNumDatanodes(3)
+        .build();
+    cluster.waitForClusterToBeReady();
+
+    // create a volume and a bucket to be used by OzoneFileSystem
+    OzoneBucket bucket = TestDataUtil.createVolumeAndBucket(cluster);
+    volumeName = bucket.getVolumeName();
+    bucketName = bucket.getName();
+
+    String rootPath = String.format("%s://%s.%s/",
+        OzoneConsts.OZONE_URI_SCHEME, bucketName, volumeName);
+
+    // Set the fs.defaultFS and start the filesystem
+    conf.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, rootPath);
+    // Set the number of keys to be processed during batch operate.
+    conf.setInt(OZONE_FS_ITERATE_BATCH_SIZE, 5);
+
+    fs = FileSystem.get(conf);
+  }
+
+  @AfterClass
+  public static void teardown() {
+    if (cluster != null) {
+      cluster.shutdown();
+    }
+    IOUtils.closeQuietly(fs);
+  }
+
+  @After
+  public void cleanup() {
+    try {
+      Path root = new Path("/");
+      FileStatus[] fileStatuses = fs.listStatus(root);
+      for (FileStatus fileStatus : fileStatuses) {
+        fs.delete(fileStatus.getPath(), true);
+      }
+    } catch (IOException ex) {
+      fail("Failed to cleanup files.");
+    }
+  }
+
+  @Test
+  public void testDeleteEmptyDirectory() throws Exception {
+    Path root = new Path("/rootDir");
+    Path appRoot = new Path(root, "appRoot");
+    fs.mkdirs(appRoot);
+
+    Table<String, OmKeyInfo> deletedDirTable =
+        cluster.getOzoneManager().getMetadataManager().getDeletedDirTable();
+    Table<String, OmDirectoryInfo> dirTable =
+        cluster.getOzoneManager().getMetadataManager().getDirectoryTable();
+
+
+    DirectoryDeletingService dirDeletingService =
+        (DirectoryDeletingService) cluster.getOzoneManager().getKeyManager()
+            .getDirDeletingService();
+    // Before delete
+    assertTableRowCount(deletedDirTable, 0);
+    assertTableRowCount(dirTable, 2);
+
+    assertSubPathsCount(dirDeletingService.getDeletedDirsCount(), 0);
+    assertSubPathsCount(dirDeletingService.getMovedFilesCount(), 0);
+
+    // Delete the appRoot, empty dir
+    fs.delete(appRoot, true);
+
+    // After Delete
+    checkPath(appRoot);
+
+    assertTableRowCount(deletedDirTable, 0);
+    assertTableRowCount(dirTable, 1);
+
+    assertSubPathsCount(dirDeletingService.getDeletedDirsCount(), 1);
+    assertSubPathsCount(dirDeletingService.getMovedFilesCount(), 0);
+
+    Assert.assertTrue(dirTable.iterator().hasNext());
+    Assert.assertEquals(root.getName(),
+        dirTable.iterator().next().getValue().getName());
+
+    Assert.assertTrue(dirDeletingService.getRunCount() > 1);
+  }
+
+  /**
+   * Tests verifies that directories and files are getting purged in multiple
+   * batches.
+   */
+  @Test
+  public void testDeleteWithLargeSubPathsThanBatchSize() throws Exception {
+    Path root = new Path("/rootDir");
+    Path appRoot = new Path(root, "appRoot");
+    // Creates 2 parent dirs from root.
+    fs.mkdirs(appRoot);
+
+    // create 2 more levels. In each level, creates 5 subdirs and 5 subfiles.
+    // This will create total of 3 parentDirs + (3 * 5) childDirs and
+    // Total of (3 * 5) childFiles
+    for (int i = 1; i <= 3; i++) {
+      Path childDir = new Path(appRoot, "parentDir" + i);
+      for (int j = 1; j <= 5; j++) {
+        // total 5 sub-dirs + 5 sub-files = 10 items in this level.
+        Path childSubDir = new Path(childDir, "childDir" + j);
+        Path childSubFile = new Path(childDir, "childFile" + j);
+        ContractTestUtils.touch(fs, childSubFile); // create sub file
+        fs.mkdirs(childSubDir); // create sub dir
+      }
+    }
+
+    Table<String, OmKeyInfo> deletedDirTable =
+        cluster.getOzoneManager().getMetadataManager().getDeletedDirTable();
+    Table<String, OmKeyInfo> keyTable =
+        cluster.getOzoneManager().getMetadataManager().getKeyTable();
+    Table<String, OmDirectoryInfo> dirTable =
+        cluster.getOzoneManager().getMetadataManager().getDirectoryTable();
+
+    DirectoryDeletingService dirDeletingService =
+        (DirectoryDeletingService) cluster.getOzoneManager().getKeyManager()
+            .getDirDeletingService();
+
+    // Before delete
+    assertTableRowCount(deletedDirTable, 0);
+    assertTableRowCount(keyTable, 15);
+    assertTableRowCount(dirTable, 20);
+
+    assertSubPathsCount(dirDeletingService.getMovedFilesCount(), 0);
+    assertSubPathsCount(dirDeletingService.getDeletedDirsCount(), 0);
+
+    // Delete the appRoot
+    fs.delete(appRoot, true);
+
+    // After Delete
+    checkPath(appRoot);
+
+    assertTableRowCount(deletedDirTable, 0);
+    assertTableRowCount(keyTable, 0);
+    assertTableRowCount(dirTable, 1);
+
+    assertSubPathsCount(dirDeletingService.getMovedFilesCount(), 15);
+    assertSubPathsCount(dirDeletingService.getDeletedDirsCount(), 19);
+
+    Assert.assertTrue(dirDeletingService.getRunCount() > 1);
+  }
+
+  @Test
+  public void testDeleteWithMultiLevels() throws Exception {
+    Path root = new Path("/rootDir");
+    Path appRoot = new Path(root, "appRoot");
+
+    for (int i = 1; i <= 3; i++) {
+      Path parent = new Path(appRoot, "parentDir" + i);
+      Path child = new Path(parent, "childFile");
+      ContractTestUtils.touch(fs, child);
+    }
+
+    Table<String, OmKeyInfo> deletedDirTable =
+        cluster.getOzoneManager().getMetadataManager().getDeletedDirTable();
+    Table<String, OmKeyInfo> keyTable =
+        cluster.getOzoneManager().getMetadataManager().getKeyTable();
+    Table<String, OmDirectoryInfo> dirTable =
+        cluster.getOzoneManager().getMetadataManager().getDirectoryTable();
+
+    DirectoryDeletingService dirDeletingService =
+        (DirectoryDeletingService) cluster.getOzoneManager().getKeyManager()
+            .getDirDeletingService();
+
+    // Before delete
+    assertTableRowCount(deletedDirTable, 0);
+    assertTableRowCount(dirTable, 5);
+    assertTableRowCount(keyTable, 3);
+
+    assertSubPathsCount(dirDeletingService.getMovedFilesCount(), 0);
+    assertSubPathsCount(dirDeletingService.getDeletedDirsCount(), 0);
+
+    // Delete the rootDir, which should delete all keys.
+    fs.delete(root, true);
+
+    // After Delete
+    checkPath(root);
+
+    assertTableRowCount(deletedDirTable, 0);
+    assertTableRowCount(keyTable, 0);
+    assertTableRowCount(dirTable, 0);
+
+    assertSubPathsCount(dirDeletingService.getMovedFilesCount(), 3);
+    assertSubPathsCount(dirDeletingService.getDeletedDirsCount(), 5);
+
+    Assert.assertTrue(dirDeletingService.getRunCount() > 1);
+  }
+
+  private void assertSubPathsCount(long pathCount, long expectedCount)
+      throws TimeoutException, InterruptedException {
+    GenericTestUtils.waitFor(() -> pathCount >= expectedCount, 1000, 120000);
+  }
+
+  private void assertTableRowCount(Table<String, ?> table, int count)
+      throws TimeoutException, InterruptedException {
+    GenericTestUtils.waitFor(() -> assertTableRowCount(count, table), 1000,
+        120000); // 2 minutes
+  }
+
+  private boolean assertTableRowCount(int expectedCount,
+                                      Table<String, ?> table) {
+    long count = 0L;
+    try {
+      count = cluster.getOzoneManager().getMetadataManager()
+          .countRowsInTable(table);
+      LOG.info("{} actual row count={}, expectedCount={}", table.getName(),
+          count, expectedCount);
+    } catch (IOException ex) {
+      fail("testDoubleBuffer failed with: " + ex);
+    }
+    return count == expectedCount;
+  }
+
+  private void checkPath(Path path) {
+    try {
+      fs.getFileStatus(path);
+      fail("testRecursiveDelete failed");
+    } catch (IOException ex) {
+      Assert.assertTrue(ex instanceof FileNotFoundException);
+      Assert.assertTrue(ex.getMessage().contains("No such file or directory"));
+    }
+  }
+
+}
diff --git a/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto b/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto
index 2bae4e5..3bacb0e 100644
--- a/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto
+++ b/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto
@@ -93,6 +93,8 @@ enum Type {
 
   ListTrash = 91;
   RecoverTrash = 92;
+
+  PurgePaths = 93;
 }
 
 message OMRequest {
@@ -165,6 +167,8 @@ message OMRequest {
 
   optional ListTrashRequest                 listTrashRequest               = 91;
   optional RecoverTrashRequest              RecoverTrashRequest            = 92;
+
+  optional PurgePathsRequest                purgePathsRequest              = 93;
 }
 
 message OMResponse {
@@ -235,6 +239,7 @@ message OMResponse {
 
   optional ListTrashResponse                  listTrashResponse            = 91;
   optional RecoverTrashResponse               RecoverTrashResponse         = 92;
+  optional PurgePathsResponse                 purgePathsResponse           = 93;
 }
 
 enum Status {
@@ -956,6 +961,16 @@ message PurgeKeysResponse {
 
 }
 
+message PurgePathsRequest {
+    repeated string deletedDirs = 1;
+    repeated KeyInfo deletedSubFiles = 2;
+    repeated KeyInfo markDeletedSubDirs = 3;
+}
+
+message PurgePathsResponse {
+
+}
+
 message DeleteOpenKeysRequest {
   repeated OpenKeyBucket openKeysPerBucket = 1;
 }
diff --git a/hadoop-ozone/interface-storage/src/main/java/org/apache/hadoop/ozone/om/OMMetadataManager.java b/hadoop-ozone/interface-storage/src/main/java/org/apache/hadoop/ozone/om/OMMetadataManager.java
index 4c66040..bcbef0c 100644
--- a/hadoop-ozone/interface-storage/src/main/java/org/apache/hadoop/ozone/om/OMMetadataManager.java
+++ b/hadoop-ozone/interface-storage/src/main/java/org/apache/hadoop/ozone/om/OMMetadataManager.java
@@ -436,4 +436,12 @@ public interface OMMetadataManager extends DBStoreHAManager {
    * @return bytes of DB key.
    */
   String getMultipartKey(long parentObjectId, String fileName, String uploadId);
+
+  /**
+   * Get Deleted Directory Table.
+   *
+   * @return Deleted Directory Table.
+   */
+  Table<String, OmKeyInfo> getDeletedDirTable();
+
 }
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/DirectoryDeletingService.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/DirectoryDeletingService.java
new file mode 100644
index 0000000..ec5c3a9
--- /dev/null
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/DirectoryDeletingService.java
@@ -0,0 +1,286 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.ozone.om;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.protobuf.ServiceException;
+import org.apache.hadoop.hdds.utils.BackgroundService;
+import org.apache.hadoop.hdds.utils.BackgroundTask;
+import org.apache.hadoop.hdds.utils.BackgroundTaskQueue;
+import org.apache.hadoop.hdds.utils.BackgroundTaskResult;
+import org.apache.hadoop.ozone.om.helpers.OMRatisHelper;
+import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
+import org.apache.hadoop.util.Time;
+import org.apache.ratis.protocol.ClientId;
+import org.apache.ratis.protocol.Message;
+import org.apache.ratis.protocol.RaftClientRequest;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static org.apache.hadoop.ozone.ClientVersions.CURRENT_VERSION;
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_PATH_DELETING_LIMIT_PER_TASK;
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_PATH_DELETING_LIMIT_PER_TASK_DEFAULT;
+
+/**
+ * This is a background service to delete orphan directories and its
+ * sub paths(sub-dirs and sub-files).
+ *
+ * <p>
+ * This will scan the metadata of om periodically to get the orphan dirs from
+ * DeletedDirectoryTable and find its sub paths. It will fetch all sub-files
+ * from KeyTable and move those to DeletedTable so that OM's
+ * KeyDeletingService will cleanup those files later. It will fetch all
+ * sub-directories from the DirectoryTable and move those to
+ * DeletedDirectoryTable so that these will be visited in next iterations.
+ *
+ * <p>
+ * After moving all sub-files and sub-dirs the parent orphan directory will be
+ * deleted by this service. It will continue traversing until all the leaf path
+ * components of an orphan directory is visited.
+ */
+public class DirectoryDeletingService extends BackgroundService {
+
+  private final KeyManager keyManager;
+  private final OzoneManager ozoneManager;
+  private AtomicLong deletedDirsCount;
+  private AtomicLong deletedFilesCount;
+  private final AtomicLong runCount;
+
+  private static ClientId clientId = ClientId.randomId();
+
+  // Use only a single thread for DirDeletion. Multiple threads would read
+  // or write to same tables and can send deletion requests for same key
+  // multiple times.
+  private static final int DIR_DELETING_CORE_POOL_SIZE = 1;
+
+  // Number of items(dirs/files) to be batched in an iteration.
+  private final long pathLimitPerTask;
+
+  public DirectoryDeletingService(long interval, TimeUnit unit,
+      long serviceTimeout, OzoneManager ozoneManager) {
+    super("DirectoryDeletingService", interval, unit,
+        DIR_DELETING_CORE_POOL_SIZE, serviceTimeout);
+    this.keyManager = ozoneManager.getKeyManager();
+    this.ozoneManager = ozoneManager;
+    this.deletedDirsCount = new AtomicLong(0);
+    this.deletedFilesCount = new AtomicLong(0);
+    this.runCount = new AtomicLong(0);
+    this.pathLimitPerTask = ozoneManager.getConfiguration()
+        .getInt(OZONE_PATH_DELETING_LIMIT_PER_TASK,
+            OZONE_PATH_DELETING_LIMIT_PER_TASK_DEFAULT);
+  }
+
+  private boolean shouldRun() {
+    if (ozoneManager == null) {
+      // OzoneManager can be null for testing
+      return true;
+    }
+    return ozoneManager.isLeaderReady();
+  }
+
+  private boolean isRatisEnabled() {
+    if (ozoneManager == null) {
+      return false;
+    }
+    return ozoneManager.isRatisEnabled();
+  }
+
+  @Override
+  public BackgroundTaskQueue getTasks() {
+    BackgroundTaskQueue queue = new BackgroundTaskQueue();
+    queue.add(new DirectoryDeletingService.DirDeletingTask());
+    return queue;
+  }
+
+  private class DirDeletingTask implements BackgroundTask {
+
+    @Override
+    public int getPriority() {
+      return 0;
+    }
+
+    @Override
+    public BackgroundTaskResult call() throws Exception {
+      if (shouldRun()) {
+        runCount.incrementAndGet();
+        long count = pathLimitPerTask;
+        try {
+          long startTime = Time.monotonicNow();
+          // step-1) Get one pending deleted directory
+          OmKeyInfo pendingDeletedDirInfo = keyManager.getPendingDeletionDir();
+          if (pendingDeletedDirInfo != null) {
+            if (LOG.isDebugEnabled()) {
+              LOG.debug("Pending deleted dir name: {}",
+                  pendingDeletedDirInfo.getKeyName());
+            }
+            // step-1: get all sub directories under the deletedDir
+            List<OmKeyInfo> dirs =
+                keyManager.getPendingDeletionSubDirs(pendingDeletedDirInfo,
+                    count);
+            count = count - dirs.size();
+            List<OmKeyInfo> deletedSubDirList = new ArrayList<>();
+            for (OmKeyInfo dirInfo : dirs) {
+              deletedSubDirList.add(dirInfo);
+              if (LOG.isDebugEnabled()) {
+                LOG.debug("deleted sub dir name: {}",
+                    dirInfo.getKeyName());
+              }
+            }
+
+            // step-2: get all sub files under the deletedDir
+            List<OmKeyInfo> purgeDeletedFiles =
+                keyManager.getPendingDeletionSubFiles(pendingDeletedDirInfo,
+                    count);
+            count = count - purgeDeletedFiles.size();
+
+            if (LOG.isDebugEnabled()) {
+              for (OmKeyInfo fileInfo : purgeDeletedFiles) {
+                LOG.debug("deleted sub file name: {}", fileInfo.getKeyName());
+              }
+            }
+
+            // step-3: Since there is a boundary condition of 'numEntries' in
+            // each batch, check whether the sub paths count reached batch size
+            // limit. If count reached limit then there can be some more child
+            // paths to be visited and will keep the parent deleted directory
+            // for one more pass.
+            List<String> purgeDeletedDirs = new ArrayList<>();
+            if (count > 0) {
+              // TODO: Now, there is only one entry in this list. Maintained
+              //  list data structure becuase this can be extended to add
+              //  more directories within the batchSize limit.
+              purgeDeletedDirs.add(pendingDeletedDirInfo.getPath());
+            }
+
+            if (isRatisEnabled()) {
+              submitPurgePaths(purgeDeletedDirs, purgeDeletedFiles,
+                  deletedSubDirList);
+            }
+            // TODO: need to handle delete with non-ratis
+
+            deletedDirsCount.addAndGet(purgeDeletedDirs.size());
+            deletedFilesCount.addAndGet(purgeDeletedFiles.size());
+            if (LOG.isDebugEnabled()) {
+              LOG.debug("Number of dirs deleted: {}, Number of files moved:" +
+                      " {} to DeletedTable, elapsed time: {}ms",
+                  deletedDirsCount, deletedFilesCount,
+                  Time.monotonicNow() - startTime);
+            }
+          }
+        } catch (IOException e) {
+          LOG.error("Error while running delete directories and files " +
+              "background task. Will retry at next run.", e);
+        }
+      }
+
+      // place holder by returning empty results of this call back.
+      return BackgroundTaskResult.EmptyTaskResult.newResult();
+    }
+  }
+
+  /**
+   * Returns the number of dirs deleted by the background service.
+   *
+   * @return Long count.
+   */
+  @VisibleForTesting
+  public long getDeletedDirsCount() {
+    return deletedDirsCount.get();
+  }
+
+  /**
+   * Returns the number of files moved to DeletedTable by the background
+   * service.
+   *
+   * @return Long count.
+   */
+  @VisibleForTesting
+  public long getMovedFilesCount() {
+    return deletedFilesCount.get();
+  }
+
+  /**
+   * Returns the number of times this Background service has run.
+   *
+   * @return Long, run count.
+   */
+  @VisibleForTesting
+  public long getRunCount() {
+    return runCount.get();
+  }
+
+  private int submitPurgePaths(List<String> purgeDeletedDirs,
+      List<OmKeyInfo> purgeDeletedFiles, List<OmKeyInfo> markDirsAsDeleted) {
+    // Put all keys to be purged in a list
+    int deletedCount = 0;
+    OzoneManagerProtocolProtos.PurgePathsRequest.Builder purgePathsRequest =
+        OzoneManagerProtocolProtos.PurgePathsRequest.newBuilder();
+    for (String purgeDir : purgeDeletedDirs) {
+      purgePathsRequest.addDeletedDirs(purgeDir);
+    }
+    for (OmKeyInfo purgeFile : purgeDeletedFiles) {
+      purgePathsRequest.addDeletedSubFiles(
+          purgeFile.getProtobuf(CURRENT_VERSION));
+    }
+
+    // Add these directories to deletedDirTable, so that its sub-paths will be
+    // traversed in next iteration to ensure cleanup all sub-children.
+    for (OmKeyInfo dir : markDirsAsDeleted) {
+      purgePathsRequest.addMarkDeletedSubDirs(dir.getProtobuf(CURRENT_VERSION));
+    }
+
+    OzoneManagerProtocolProtos.OMRequest omRequest =
+        OzoneManagerProtocolProtos.OMRequest.newBuilder()
+            .setCmdType(OzoneManagerProtocolProtos.Type.PurgePaths)
+            .setPurgePathsRequest(purgePathsRequest)
+            .setClientId(clientId.toString())
+            .build();
+
+    // Submit Purge paths request to OM
+    try {
+      RaftClientRequest raftClientRequest =
+          createRaftClientRequestForDelete(omRequest);
+      ozoneManager.getOmRatisServer().submitRequest(omRequest,
+          raftClientRequest);
+    } catch (ServiceException e) {
+      LOG.error("PurgePaths request failed. Will retry at next run.");
+      return 0;
+    }
+    return deletedCount;
+  }
+
+
+  private RaftClientRequest createRaftClientRequestForDelete(
+      OzoneManagerProtocolProtos.OMRequest omRequest) {
+    return RaftClientRequest.newBuilder()
+        .setClientId(clientId)
+        .setServerId(ozoneManager.getOmRatisServer().getRaftPeerId())
+        .setGroupId(ozoneManager.getOmRatisServer().getRaftGroupId())
+        .setCallId(runCount.get())
+        .setMessage(
+            Message.valueOf(
+                OMRatisHelper.convertRequestToByteString(omRequest)))
+        .setType(RaftClientRequest.writeRequestType())
+        .build();
+  }
+
+}
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManager.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManager.java
index 658f503..b569b5d 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManager.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManager.java
@@ -273,4 +273,60 @@ public interface KeyManager extends OzoneManagerFS, IOzoneAcl {
    * @param key
    */
   void refresh(OmKeyInfo key) throws IOException;
+
+  /**
+   * Assume OM has FS namespace like below, deleteDirTable stores absolute
+   * path name as existing KeyDeletionService expects full key name.
+   * For example, if user deletes directory 'd1' then the entry in OM DB looks
+   * like, DBKey = 1030/d3 and DBValue = KeyInfo with keyName "a/b2/d3"
+   *
+   *                   vol1
+   *                    |
+   *                  buck-1
+   *                    |
+   *                    a
+   *                    |
+   *      -----------------------------------
+   *     |             |                     |
+   *     b1            b2                    b3
+   *   -----       ---------               ----------
+   *   |    |      |    |   |             |    |     |
+   *  c1   c2     d1   d2  d3             e1   e2   e3
+   *                   |                  |
+   *               --------               |
+   *              |        |              |
+   *           d21.txt   d22.txt        e11.txt
+   *
+   * @return OmKeyInfo
+   * @throws IOException
+   */
+  OmKeyInfo getPendingDeletionDir() throws IOException;
+
+  /**
+   * Returns all sub directories under the given parent directory.
+   *
+   * @param parentInfo
+   * @param numEntries
+   * @return list of dirs
+   * @throws IOException
+   */
+  List<OmKeyInfo> getPendingDeletionSubDirs(OmKeyInfo parentInfo,
+      long numEntries) throws IOException;
+
+  /**
+   * Returns all sub files under the given parent directory.
+   *
+   * @param parentInfo
+   * @param numEntries
+   * @return list of files
+   * @throws IOException
+   */
+  List<OmKeyInfo> getPendingDeletionSubFiles(OmKeyInfo parentInfo,
+      long numEntries) throws IOException;
+
+  /**
+   * Returns the instance of Directory Deleting Service.
+   * @return Background service.
+   */
+  BackgroundService getDirDeletingService();
 }
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
index 422a915..38a4307 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
@@ -134,6 +134,8 @@ import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE_DEFAU
 import static org.apache.hadoop.ozone.ClientVersions.CURRENT_VERSION;
 import static org.apache.hadoop.ozone.OzoneConsts.OM_KEY_PREFIX;
 import static org.apache.hadoop.ozone.OzoneConsts.OZONE_URI_DELIMITER;
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_DIR_DELETING_SERVICE_INTERVAL;
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_DIR_DELETING_SERVICE_INTERVAL_DEFAULT;
 import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.BUCKET_NOT_FOUND;
 import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.DIRECTORY_NOT_FOUND;
 import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.FILE_NOT_FOUND;
@@ -176,6 +178,7 @@ public class KeyManagerImpl implements KeyManager {
   private final PrefixManager prefixManager;
 
   private final boolean enableFileSystemPaths;
+  private BackgroundService dirDeletingService;
 
 
   @VisibleForTesting
@@ -250,6 +253,22 @@ public class KeyManagerImpl implements KeyManager {
           serviceTimeout, configuration);
       keyDeletingService.start();
     }
+
+    // Start directory deletion service for FSO buckets.
+    if (OzoneManagerRatisUtils.isBucketFSOptimized()
+        && dirDeletingService == null) {
+      long dirDeleteInterval = configuration.getTimeDuration(
+          OZONE_DIR_DELETING_SERVICE_INTERVAL,
+          OZONE_DIR_DELETING_SERVICE_INTERVAL_DEFAULT,
+          TimeUnit.MILLISECONDS);
+      long serviceTimeout = configuration.getTimeDuration(
+          OZONE_BLOCK_DELETING_SERVICE_TIMEOUT,
+          OZONE_BLOCK_DELETING_SERVICE_TIMEOUT_DEFAULT,
+          TimeUnit.MILLISECONDS);
+      dirDeletingService = new DirectoryDeletingService(dirDeleteInterval,
+          TimeUnit.SECONDS, serviceTimeout, ozoneManager);
+      dirDeletingService.start();
+    }
   }
 
   KeyProviderCryptoExtension getKMSProvider() {
@@ -262,6 +281,10 @@ public class KeyManagerImpl implements KeyManager {
       keyDeletingService.shutdown();
       keyDeletingService = null;
     }
+    if (dirDeletingService != null) {
+      dirDeletingService.shutdown();
+      dirDeletingService = null;
+    }
   }
 
   private OmBucketInfo getBucketInfo(String volumeName, String bucketName)
@@ -980,6 +1003,11 @@ public class KeyManagerImpl implements KeyManager {
   }
 
   @Override
+  public BackgroundService getDirDeletingService() {
+    return dirDeletingService;
+  }
+
+  @Override
   public OmMultipartInfo initiateMultipartUpload(OmKeyArgs omKeyArgs) throws
       IOException {
     Preconditions.checkNotNull(omKeyArgs);
@@ -2894,4 +2922,88 @@ public class KeyManagerImpl implements KeyManager {
     }
     return nodeSet;
   }
+
+  @Override
+  public OmKeyInfo getPendingDeletionDir() throws IOException {
+    OmKeyInfo omKeyInfo = null;
+    try (TableIterator<String, ? extends Table.KeyValue<String, OmKeyInfo>>
+             deletedDirItr = metadataManager.getDeletedDirTable().iterator()) {
+      if (deletedDirItr.hasNext()) {
+        Table.KeyValue<String, OmKeyInfo> keyValue = deletedDirItr.next();
+        if (keyValue != null) {
+          omKeyInfo = keyValue.getValue();
+        }
+      }
+    }
+    return omKeyInfo;
+  }
+
+  @Override
+  public List<OmKeyInfo> getPendingDeletionSubDirs(OmKeyInfo parentInfo,
+      long numEntries) throws IOException {
+    List<OmKeyInfo> directories = new ArrayList<>();
+    String seekDirInDB = metadataManager.getOzonePathKey(
+        parentInfo.getObjectID(), "");
+    long countEntries = 0;
+
+    Table dirTable = metadataManager.getDirectoryTable();
+    TableIterator<String, ? extends Table.KeyValue<String, OmDirectoryInfo>>
+        iterator = dirTable.iterator();
+
+    iterator.seek(seekDirInDB);
+
+    while (iterator.hasNext() && numEntries - countEntries > 0) {
+      OmDirectoryInfo dirInfo = iterator.value().getValue();
+      if (!OMFileRequest.isImmediateChild(dirInfo.getParentObjectID(),
+          parentInfo.getObjectID())) {
+        break;
+      }
+      String dirName = OMFileRequest.getAbsolutePath(parentInfo.getKeyName(),
+          dirInfo.getName());
+      OmKeyInfo omKeyInfo = OMFileRequest.getOmKeyInfo(
+          parentInfo.getVolumeName(), parentInfo.getBucketName(), dirInfo,
+          dirName);
+      directories.add(omKeyInfo);
+      countEntries++;
+
+      // move to next entry in the DirTable
+      iterator.next();
+    }
+
+    return directories;
+  }
+
+  @Override
+  public List<OmKeyInfo> getPendingDeletionSubFiles(OmKeyInfo parentInfo,
+      long numEntries) throws IOException {
+    List<OmKeyInfo> files = new ArrayList<>();
+    String seekFileInDB = metadataManager.getOzonePathKey(
+        parentInfo.getObjectID(), "");
+    long countEntries = 0;
+
+    Table fileTable = metadataManager.getKeyTable();
+    TableIterator<String, ? extends Table.KeyValue<String, OmKeyInfo>>
+        iterator = fileTable.iterator();
+
+    iterator.seek(seekFileInDB);
+
+    while (iterator.hasNext() && numEntries - countEntries > 0) {
+      OmKeyInfo fileInfo = iterator.value().getValue();
+      if (!OMFileRequest.isImmediateChild(fileInfo.getParentObjectID(),
+          parentInfo.getObjectID())) {
+        break;
+      }
+      fileInfo.setFileName(fileInfo.getKeyName());
+      String fullKeyPath = OMFileRequest.getAbsolutePath(
+          parentInfo.getKeyName(), fileInfo.getKeyName());
+      fileInfo.setKeyName(fullKeyPath);
+
+      files.add(fileInfo);
+      countEntries++;
+      // move to next entry in the KeyTable
+      iterator.next();
+    }
+
+    return files;
+  }
 }
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java
index 4e4f91b..b67346e 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java
@@ -137,6 +137,8 @@ public class OmMetadataManagerImpl implements OMMetadataManager {
    * |----------------------------------------------------------------------|
    * |  multipartFileInfoTable | parentId/fileName/uploadId ->...           |
    * |----------------------------------------------------------------------|
+   * |  deletedDirTable      | parentId/directoryName -> KeyInfo            |
+   * |----------------------------------------------------------------------|
    * |  transactionInfoTable | #TRANSACTIONINFO -> OMTransactionInfo        |
    * |----------------------------------------------------------------------|
    */
@@ -155,6 +157,7 @@ public class OmMetadataManagerImpl implements OMMetadataManager {
   public static final String FILE_TABLE = "fileTable";
   public static final String OPEN_FILE_TABLE = "openFileTable";
   public static final String MULTIPARTFILEINFO_TABLE = "multipartFileInfoTable";
+  public static final String DELETED_DIR_TABLE = "deletedDirectoryTable";
   public static final String TRANSACTION_INFO_TABLE =
       "transactionInfoTable";
 
@@ -180,6 +183,7 @@ public class OmMetadataManagerImpl implements OMMetadataManager {
   private boolean isRatisEnabled;
   private boolean ignorePipelineinKey;
   private Table<String, OmMultipartKeyInfo> multipartFileInfoTable;
+  private Table deletedDirTable;
 
   // Epoch is used to generate the objectIDs. The most significant 2 bits of
   // objectIDs is set to this epoch. For clusters before HDDS-4315 there is
@@ -256,6 +260,11 @@ public class OmMetadataManagerImpl implements OMMetadataManager {
   }
 
   @Override
+  public Table<String, OmKeyInfo> getDeletedDirTable() {
+    return deletedDirTable;
+  }
+
+  @Override
   public Table<String, OmKeyInfo> getOpenKeyTable() {
     if (OzoneManagerRatisUtils.isBucketFSOptimized()) {
       return openFileTable;
@@ -372,6 +381,7 @@ public class OmMetadataManagerImpl implements OMMetadataManager {
         .addTable(FILE_TABLE)
         .addTable(OPEN_FILE_TABLE)
         .addTable(MULTIPARTFILEINFO_TABLE)
+        .addTable(DELETED_DIR_TABLE)
         .addTable(TRANSACTION_INFO_TABLE)
         .addCodec(OzoneTokenIdentifier.class, new TokenIdentifierCodec())
         .addCodec(OmKeyInfo.class, new OmKeyInfoCodec(true))
@@ -454,6 +464,10 @@ public class OmMetadataManagerImpl implements OMMetadataManager {
             String.class, OmMultipartKeyInfo.class);
     checkTableStatus(multipartFileInfoTable, MULTIPARTFILEINFO_TABLE);
 
+    deletedDirTable = this.store.getTable(DELETED_DIR_TABLE, String.class,
+        OmKeyInfo.class);
+    checkTableStatus(deletedDirTable, DELETED_DIR_TABLE);
+
     transactionInfoTable = this.store.getTable(TRANSACTION_INFO_TABLE,
         String.class, TransactionInfo.class);
     checkTableStatus(transactionInfoTable, TRANSACTION_INFO_TABLE);
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/codec/OMDBDefinition.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/codec/OMDBDefinition.java
index 77b9e04..f3716db 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/codec/OMDBDefinition.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/codec/OMDBDefinition.java
@@ -180,6 +180,12 @@ public class OMDBDefinition implements DBDefinition {
                   OmMultipartKeyInfo.class,
                   new OmMultipartKeyInfoCodec());
 
+  public static final DBColumnFamilyDefinition<String, OmKeyInfo>
+      DELETED_DIR_TABLE =
+      new DBColumnFamilyDefinition<>(OmMetadataManagerImpl.DELETED_DIR_TABLE,
+          String.class, new StringCodec(), OmKeyInfo.class,
+          new OmKeyInfoCodec(true));
+
   @Override
   public String getName() {
     return OzoneConsts.OM_DB_NAME;
@@ -196,7 +202,8 @@ public class OMDBDefinition implements DBDefinition {
         VOLUME_TABLE, OPEN_KEY_TABLE, KEY_TABLE,
         BUCKET_TABLE, MULTIPART_INFO_TABLE, PREFIX_TABLE, DTOKEN_TABLE,
         S3_SECRET_TABLE, TRANSACTION_INFO_TABLE, DIRECTORY_TABLE,
-        FILE_TABLE, OPEN_FILE_TABLE, MULTIPART_FILEINFO_TABLE};
+        FILE_TABLE, OPEN_FILE_TABLE, MULTIPART_FILEINFO_TABLE,
+        DELETED_DIR_TABLE};
   }
 }
 
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/utils/OzoneManagerRatisUtils.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/utils/OzoneManagerRatisUtils.java
index b3e6c4f..f4628a9 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/utils/OzoneManagerRatisUtils.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/utils/OzoneManagerRatisUtils.java
@@ -48,6 +48,7 @@ import org.apache.hadoop.ozone.om.request.key.OMKeyPurgeRequest;
 import org.apache.hadoop.ozone.om.request.key.OMKeyRenameRequest;
 import org.apache.hadoop.ozone.om.request.key.OMKeyRenameRequestV1;
 import org.apache.hadoop.ozone.om.request.key.OMKeysRenameRequest;
+import org.apache.hadoop.ozone.om.request.key.OMPathsPurgeRequestV1;
 import org.apache.hadoop.ozone.om.request.key.OMTrashRecoverRequest;
 import org.apache.hadoop.ozone.om.request.key.acl.OMKeyAddAclRequest;
 import org.apache.hadoop.ozone.om.request.key.acl.OMKeyAddAclRequestV1;
@@ -184,6 +185,8 @@ public final class OzoneManagerRatisUtils {
       return new OMFileCreateRequest(omRequest);
     case PurgeKeys:
       return new OMKeyPurgeRequest(omRequest);
+    case PurgePaths:
+      return new OMPathsPurgeRequestV1(omRequest);
     case InitiateMultiPartUpload:
       if (isBucketFSOptimized()) {
         return new S3InitiateMultipartUploadRequestV1(omRequest);
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyDeleteRequestV1.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyDeleteRequestV1.java
index dbf5645..87427f8 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyDeleteRequestV1.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyDeleteRequestV1.java
@@ -160,9 +160,9 @@ public class OMKeyDeleteRequestV1 extends OMKeyDeleteRequest {
       // TODO: Revisit if we need it later.
 
       omClientResponse = new OMKeyDeleteResponseV1(omResponse
-              .setDeleteKeyResponse(DeleteKeyResponse.newBuilder()).build(),
-              omKeyInfo, ozoneManager.isRatisEnabled(),
-              omBucketInfo.copyObject(), keyStatus.isDirectory());
+          .setDeleteKeyResponse(DeleteKeyResponse.newBuilder()).build(),
+          keyName, omKeyInfo, ozoneManager.isRatisEnabled(),
+          omBucketInfo.copyObject(), keyStatus.isDirectory());
 
       result = Result.SUCCESS;
     } catch (IOException ex) {
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMPathsPurgeRequestV1.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMPathsPurgeRequestV1.java
new file mode 100644
index 0000000..b9d6066
--- /dev/null
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMPathsPurgeRequestV1.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.om.request.key;
+
+import org.apache.hadoop.ozone.om.OzoneManager;
+import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerDoubleBufferHelper;
+import org.apache.hadoop.ozone.om.request.util.OmResponseUtil;
+import org.apache.hadoop.ozone.om.response.OMClientResponse;
+import org.apache.hadoop.ozone.om.response.key.OMPathsPurgeResponseV1;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse;
+
+import java.util.List;
+
+/**
+ * Handles purging of keys from OM DB.
+ */
+public class OMPathsPurgeRequestV1 extends OMKeyRequest {
+
+  public OMPathsPurgeRequestV1(OMRequest omRequest) {
+    super(omRequest);
+  }
+
+  @Override
+  public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager,
+      long trxnLogIndex, OzoneManagerDoubleBufferHelper omDoubleBufferHelper) {
+    OzoneManagerProtocolProtos.PurgePathsRequest purgePathsRequest =
+        getOmRequest().getPurgePathsRequest();
+
+    List<String> deletedDirsList = purgePathsRequest.getDeletedDirsList();
+    List<OzoneManagerProtocolProtos.KeyInfo> deletedSubFilesList =
+        purgePathsRequest.getDeletedSubFilesList();
+    List<OzoneManagerProtocolProtos.KeyInfo> markDeletedSubDirsList =
+        purgePathsRequest.getMarkDeletedSubDirsList();
+
+    OMResponse.Builder omResponse = OmResponseUtil.getOMResponseBuilder(
+        getOmRequest());
+
+    OMClientResponse omClientResponse = new OMPathsPurgeResponseV1(
+        omResponse.build(), markDeletedSubDirsList, deletedSubFilesList,
+        deletedDirsList, ozoneManager.isRatisEnabled());
+    addResponseToDoubleBuffer(trxnLogIndex, omClientResponse,
+        omDoubleBufferHelper);
+
+    return omClientResponse;
+  }
+}
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/OMKeyDeleteResponseV1.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/OMKeyDeleteResponseV1.java
index 15c1ba6..69e87df 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/OMKeyDeleteResponseV1.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/OMKeyDeleteResponseV1.java
@@ -40,12 +40,14 @@ import static org.apache.hadoop.ozone.om.OmMetadataManagerImpl.FILE_TABLE;
 public class OMKeyDeleteResponseV1 extends OMKeyDeleteResponse {
 
   private boolean isDeleteDirectory;
+  private String keyName;
 
   public OMKeyDeleteResponseV1(@Nonnull OMResponse omResponse,
-      @Nonnull OmKeyInfo omKeyInfo, boolean isRatisEnabled,
-      @Nonnull OmBucketInfo omBucketInfo,
+      @Nonnull String keyName, @Nonnull OmKeyInfo omKeyInfo,
+      boolean isRatisEnabled, @Nonnull OmBucketInfo omBucketInfo,
       @Nonnull boolean isDeleteDirectory) {
     super(omResponse, omKeyInfo, isRatisEnabled, omBucketInfo);
+    this.keyName = keyName;
     this.isDeleteDirectory = isDeleteDirectory;
   }
 
@@ -69,6 +71,12 @@ public class OMKeyDeleteResponseV1 extends OMKeyDeleteResponse {
     if (isDeleteDirectory) {
       omMetadataManager.getDirectoryTable().deleteWithBatch(batchOperation,
               ozoneDbKey);
+      OmKeyInfo omKeyInfo = getOmKeyInfo();
+      // Sets full absolute key name to OmKeyInfo, which is
+      // required for moving the sub-files to KeyDeletionService.
+      omKeyInfo.setKeyName(keyName);
+      omMetadataManager.getDeletedDirTable().putWithBatch(
+          batchOperation, ozoneDbKey, omKeyInfo);
     } else {
       Table<String, OmKeyInfo> keyTable = omMetadataManager.getKeyTable();
       addDeletionToBatch(omMetadataManager, batchOperation, keyTable,
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/OMPathsPurgeResponseV1.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/OMPathsPurgeResponseV1.java
new file mode 100644
index 0000000..b6f5299
--- /dev/null
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/OMPathsPurgeResponseV1.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.om.response.key;
+
+import org.apache.hadoop.hdds.utils.db.BatchOperation;
+import org.apache.hadoop.ozone.OmUtils;
+import org.apache.hadoop.ozone.om.OMMetadataManager;
+import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
+import org.apache.hadoop.ozone.om.helpers.RepeatedOmKeyInfo;
+import org.apache.hadoop.ozone.om.request.key.OMPathsPurgeRequestV1;
+import org.apache.hadoop.ozone.om.response.CleanupTableInfo;
+import org.apache.hadoop.ozone.om.response.OMClientResponse;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+import java.io.IOException;
+import java.util.List;
+
+import static org.apache.hadoop.ozone.om.OmMetadataManagerImpl.DELETED_DIR_TABLE;
+import static org.apache.hadoop.ozone.om.OmMetadataManagerImpl.DELETED_TABLE;
+import static org.apache.hadoop.ozone.om.OmMetadataManagerImpl.DIRECTORY_TABLE;
+import static org.apache.hadoop.ozone.om.OmMetadataManagerImpl.FILE_TABLE;
+
+/**
+ * Response for {@link OMPathsPurgeRequestV1} request.
+ */
+@CleanupTableInfo(cleanupTables = {DELETED_TABLE, DELETED_DIR_TABLE,
+    DIRECTORY_TABLE, FILE_TABLE})
+public class OMPathsPurgeResponseV1 extends OMClientResponse {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(OMPathsPurgeResponseV1.class);
+
+  private List<OzoneManagerProtocolProtos.KeyInfo> markDeletedDirList;
+  private List<String> dirList;
+  private List<OzoneManagerProtocolProtos.KeyInfo> fileList;
+  private boolean isRatisEnabled;
+
+
+  public OMPathsPurgeResponseV1(@Nonnull OMResponse omResponse,
+      @Nonnull List<OzoneManagerProtocolProtos.KeyInfo> markDeletedDirs,
+      @Nonnull List<OzoneManagerProtocolProtos.KeyInfo> files,
+      @Nonnull List<String> dirs, boolean isRatisEnabled) {
+    super(omResponse);
+    this.markDeletedDirList = markDeletedDirs;
+    this.dirList = dirs;
+    this.fileList = files;
+    this.isRatisEnabled = isRatisEnabled;
+  }
+
+  @Override
+  public void addToDBBatch(OMMetadataManager omMetadataManager,
+      BatchOperation batchOperation) throws IOException {
+
+    // Add all sub-directories to deleted directory table.
+    for (OzoneManagerProtocolProtos.KeyInfo key : markDeletedDirList) {
+      OmKeyInfo keyInfo = OmKeyInfo.getFromProtobuf(key);
+      String ozoneDbKey = omMetadataManager.getOzonePathKey(
+          keyInfo.getParentObjectID(), keyInfo.getFileName());
+      omMetadataManager.getDeletedDirTable().putWithBatch(batchOperation,
+          ozoneDbKey, keyInfo);
+
+      omMetadataManager.getDirectoryTable().deleteWithBatch(batchOperation,
+          ozoneDbKey);
+
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("markDeletedDirList KeyName: {}, DBKey: {}",
+            keyInfo.getKeyName(), ozoneDbKey);
+      }
+    }
+
+    // Delete all the visited directories from deleted directory table
+    for (String key : dirList) {
+      omMetadataManager.getDeletedDirTable().deleteWithBatch(batchOperation,
+          key);
+
+      if (LOG.isDebugEnabled()) {
+        LOG.info("Purge Deleted Directory DBKey: {}", key);
+      }
+    }
+    for (OzoneManagerProtocolProtos.KeyInfo key : fileList) {
+      OmKeyInfo keyInfo = OmKeyInfo.getFromProtobuf(key);
+      String ozoneDbKey = omMetadataManager.getOzonePathKey(
+          keyInfo.getParentObjectID(), keyInfo.getFileName());
+      omMetadataManager.getKeyTable().deleteWithBatch(batchOperation,
+          ozoneDbKey);
+
+      if (LOG.isDebugEnabled()) {
+        LOG.info("Move keyName:{} to DeletedTable DBKey: {}",
+            keyInfo.getKeyName(), ozoneDbKey);
+      }
+
+      RepeatedOmKeyInfo repeatedOmKeyInfo = null;
+      repeatedOmKeyInfo = OmUtils.prepareKeyForDelete(keyInfo,
+          repeatedOmKeyInfo, keyInfo.getUpdateID(), isRatisEnabled);
+
+      omMetadataManager.getDeletedTable().putWithBatch(batchOperation,
+          keyInfo.getPath(), repeatedOmKeyInfo);
+
+    }
+  }
+
+}
diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/key/TestOMKeyDeleteResponseV1.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/key/TestOMKeyDeleteResponseV1.java
index d46fe72..4422527 100644
--- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/key/TestOMKeyDeleteResponseV1.java
+++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/key/TestOMKeyDeleteResponseV1.java
@@ -36,8 +36,8 @@ public class TestOMKeyDeleteResponseV1 extends TestOMKeyDeleteResponse {
   @Override
   protected OMKeyDeleteResponse getOmKeyDeleteResponse(OmKeyInfo omKeyInfo,
       OzoneManagerProtocolProtos.OMResponse omResponse) {
-    return new OMKeyDeleteResponseV1(omResponse, omKeyInfo,
-            true, getOmBucketInfo(), false);
+    return new OMKeyDeleteResponseV1(omResponse, omKeyInfo.getKeyName(),
+        omKeyInfo, true, getOmBucketInfo(), false);
   }
 
   @Override

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org