You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by ra...@apache.org on 2021/01/06 08:23:38 UTC

[ozone] 05/08: HDDS-4358: Delete : make delete an atomic operation (#1607)

This is an automated email from the ASF dual-hosted git repository.

rakeshr pushed a commit to branch HDDS-2939
in repository https://gitbox.apache.org/repos/asf/ozone.git

commit 369d3a382f879347979af361a50761e9f841786e
Author: Rakesh Radhakrishnan <ra...@apache.org>
AuthorDate: Mon Dec 7 13:16:36 2020 +0530

    HDDS-4358: Delete : make delete an atomic operation (#1607)
---
 .../apache/hadoop/ozone/client/OzoneBucket.java    |  16 +-
 .../ozone/client/protocol/ClientProtocol.java      |   5 +-
 .../apache/hadoop/ozone/client/rpc/RpcClient.java  |   3 +-
 .../hadoop/ozone/om/exceptions/OMException.java    |   4 +-
 .../apache/hadoop/ozone/om/helpers/OmKeyArgs.java  |  16 +-
 ...OzoneManagerProtocolClientSideTranslatorPB.java |   3 +-
 .../hadoop/fs/ozone/TestOzoneFileSystem.java       |  47 +++--
 .../hadoop/fs/ozone/TestOzoneFileSystemV1.java     | 214 +++++----------------
 .../src/main/proto/OmClientProtocol.proto          |   4 +
 .../org/apache/hadoop/ozone/om/KeyManagerImpl.java |  10 +-
 .../om/ratis/utils/OzoneManagerRatisUtils.java     |   4 +
 .../ozone/om/request/file/OMFileRequest.java       |  92 +++++++++
 .../ozone/om/request/key/OMKeyDeleteRequestV1.java | 202 +++++++++++++++++++
 .../ozone/om/request/key/OMKeyRenameRequestV1.java |   3 +-
 .../ozone/om/response/key/OMKeyDeleteResponse.java |   8 +
 ...eteResponse.java => OMKeyDeleteResponseV1.java} |  49 ++---
 .../ozone/om/request/TestOMRequestUtils.java       |   3 +
 .../om/request/key/TestOMKeyDeleteRequest.java     |  40 ++--
 .../om/request/key/TestOMKeyDeleteRequestV1.java   |  57 ++++++
 .../om/response/key/TestOMKeyDeleteResponse.java   |  89 +++++----
 .../om/response/key/TestOMKeyDeleteResponseV1.java |  70 +++++++
 .../fs/ozone/BasicOzoneClientAdapterImpl.java      |  26 ++-
 .../hadoop/fs/ozone/BasicOzoneFileSystem.java      |  26 ++-
 .../ozone/BasicRootedOzoneClientAdapterImpl.java   |  19 +-
 .../apache/hadoop/fs/ozone/OzoneClientAdapter.java |   4 +-
 25 files changed, 728 insertions(+), 286 deletions(-)

diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneBucket.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneBucket.java
index f688a66..c1877b4 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneBucket.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneBucket.java
@@ -553,7 +553,21 @@ public class OzoneBucket extends WithMetadata {
    * @throws IOException
    */
   public void deleteKey(String key) throws IOException {
-    proxy.deleteKey(volumeName, name, key);
+    proxy.deleteKey(volumeName, name, key, false);
+  }
+
+  /**
+   * Ozone FS api to delete a directory. Sub directories will be deleted if
+   * recursive flag is true, otherwise it will be non-recursive.
+   *
+   * @param key       Name of the key to be deleted.
+   * @param recursive recursive deletion of all sub path keys if true,
+   *                  otherwise non-recursive
+   * @throws IOException
+   */
+  public void deleteDirectory(String key, boolean recursive)
+      throws IOException {
+    proxy.deleteKey(volumeName, name, key, recursive);
   }
 
   /**
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/protocol/ClientProtocol.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/protocol/ClientProtocol.java
index 863a109..2f1c5af 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/protocol/ClientProtocol.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/protocol/ClientProtocol.java
@@ -289,9 +289,12 @@ public interface ClientProtocol {
    * @param volumeName Name of the Volume
    * @param bucketName Name of the Bucket
    * @param keyName Name of the Key
+   * @param recursive recursive deletion of all sub path keys if true,
+   *                  otherwise non-recursive
    * @throws IOException
    */
-  void deleteKey(String volumeName, String bucketName, String keyName)
+  void deleteKey(String volumeName, String bucketName, String keyName,
+                 boolean recursive)
       throws IOException;
 
   /**
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
index 28f9a01..9f9d845 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
@@ -735,7 +735,7 @@ public class RpcClient implements ClientProtocol {
 
   @Override
   public void deleteKey(
-      String volumeName, String bucketName, String keyName)
+      String volumeName, String bucketName, String keyName, boolean recursive)
       throws IOException {
     verifyVolumeName(volumeName);
     verifyBucketName(bucketName);
@@ -744,6 +744,7 @@ public class RpcClient implements ClientProtocol {
         .setVolumeName(volumeName)
         .setBucketName(bucketName)
         .setKeyName(keyName)
+        .setRecursive(recursive)
         .build();
     ozoneManagerClient.deleteKey(keyArgs);
   }
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/exceptions/OMException.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/exceptions/OMException.java
index b676bca..bba97cd 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/exceptions/OMException.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/exceptions/OMException.java
@@ -233,7 +233,9 @@ public class OMException extends IOException {
 
     QUOTA_EXCEEDED,
 
-    QUOTA_ERROR
+    QUOTA_ERROR,
+
+    DIRECTORY_NOT_EMPTY
 
   }
 }
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyArgs.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyArgs.java
index c08c988..f8c7c23 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyArgs.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyArgs.java
@@ -48,6 +48,7 @@ public final class OmKeyArgs implements Auditable {
   private boolean refreshPipeline;
   private boolean sortDatanodesInPipeline;
   private List<OzoneAcl> acls;
+  private boolean recursive;
 
   @SuppressWarnings("parameternumber")
   private OmKeyArgs(String volumeName, String bucketName, String keyName,
@@ -55,7 +56,7 @@ public final class OmKeyArgs implements Auditable {
       List<OmKeyLocationInfo> locationInfoList, boolean isMultipart,
       String uploadID, int partNumber,
       Map<String, String> metadataMap, boolean refreshPipeline,
-      List<OzoneAcl> acls, boolean sortDatanode) {
+      List<OzoneAcl> acls, boolean sortDatanode, boolean recursive) {
     this.volumeName = volumeName;
     this.bucketName = bucketName;
     this.keyName = keyName;
@@ -70,6 +71,7 @@ public final class OmKeyArgs implements Auditable {
     this.refreshPipeline = refreshPipeline;
     this.acls = acls;
     this.sortDatanodesInPipeline = sortDatanode;
+    this.recursive = recursive;
   }
 
   public boolean getIsMultipartKey() {
@@ -140,6 +142,10 @@ public final class OmKeyArgs implements Auditable {
     return sortDatanodesInPipeline;
   }
 
+  public boolean isRecursive() {
+    return recursive;
+  }
+
   @Override
   public Map<String, String> toAuditMap() {
     Map<String, String> auditMap = new LinkedHashMap<>();
@@ -198,6 +204,7 @@ public final class OmKeyArgs implements Auditable {
     private boolean refreshPipeline;
     private boolean sortDatanodesInPipeline;
     private List<OzoneAcl> acls;
+    private boolean recursive;
 
     public Builder setVolumeName(String volume) {
       this.volumeName = volume;
@@ -274,11 +281,16 @@ public final class OmKeyArgs implements Auditable {
       return this;
     }
 
+    public Builder setRecursive(boolean isRecursive) {
+      this.recursive = isRecursive;
+      return this;
+    }
+
     public OmKeyArgs build() {
       return new OmKeyArgs(volumeName, bucketName, keyName, dataSize, type,
           factor, locationInfoList, isMultipartKey, multipartUploadID,
           multipartUploadPartNumber, metadata, refreshPipeline, acls,
-          sortDatanodesInPipeline);
+          sortDatanodesInPipeline, recursive);
     }
 
   }
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java
index 7a3a38e..c913a2d 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java
@@ -741,7 +741,8 @@ public final class OzoneManagerProtocolClientSideTranslatorPB
     KeyArgs keyArgs = KeyArgs.newBuilder()
         .setVolumeName(args.getVolumeName())
         .setBucketName(args.getBucketName())
-        .setKeyName(args.getKeyName()).build();
+        .setKeyName(args.getKeyName())
+        .setRecursive(args.isRecursive()).build();
     req.setKeyArgs(keyArgs);
 
     OMRequest omRequest = createOMRequest(Type.DeleteKey)
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneFileSystem.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneFileSystem.java
index bb203d5..3bfb4fa 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneFileSystem.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneFileSystem.java
@@ -36,6 +36,7 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.InvalidPathException;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathIsNotEmptyDirectoryException;
 import org.apache.hadoop.fs.Trash;
 import org.apache.hadoop.fs.TrashPolicy;
 import org.apache.hadoop.fs.contract.ContractTestUtils;
@@ -383,7 +384,7 @@ public class TestOzoneFileSystem {
   }
 
 
-  private void testRecursiveDelete() throws Exception {
+  protected void testRecursiveDelete() throws Exception {
     Path grandparent = new Path("/gdir1");
 
     for (int i = 1; i <= 10; i++) {
@@ -392,6 +393,24 @@ public class TestOzoneFileSystem {
       ContractTestUtils.touch(fs, child);
     }
 
+    // delete a dir with sub-file
+    try {
+      FileStatus[] parents = fs.listStatus(grandparent);
+      Assert.assertTrue(parents.length > 0);
+      fs.delete(parents[0].getPath(), false);
+      Assert.fail("Must throw exception as dir is not empty!");
+    } catch (PathIsNotEmptyDirectoryException pde) {
+      // expected
+    }
+
+    // delete a dir with sub-file
+    try {
+      fs.delete(grandparent, false);
+      Assert.fail("Must throw exception as dir is not empty!");
+    } catch (PathIsNotEmptyDirectoryException pde) {
+      // expected
+    }
+
     // Delete the grandparent, which should delete all keys.
     fs.delete(grandparent, true);
 
@@ -454,7 +473,7 @@ public class TestOzoneFileSystem {
     }
   }
 
-  private void testFileDelete() throws Exception {
+  protected void testFileDelete() throws Exception {
     Path grandparent = new Path("/testBatchDelete");
     Path parent = new Path(grandparent, "parent");
     Path childFolder = new Path(parent, "childFolder");
@@ -786,12 +805,8 @@ public class TestOzoneFileSystem {
     final Path baPath = new Path(fs.getUri().toString() + "/b/a");
     fs.mkdirs(baPath);
 
-    try {
-      fs.rename(aSourcePath, bDestinPath);
-      Assert.fail("Should fail as new destination dir exists!");
-    } catch (FileAlreadyExistsException faee) {
-      // expected as new sub-path /b/a already exists.
-    }
+    Assert.assertFalse("New destin sub-path /b/a already exists",
+            fs.rename(aSourcePath, bDestinPath));
 
     // Case-5.b) Rename file from /a/b/c/file1 to /a.
     // Should be failed since /a/file1 exists.
@@ -805,12 +820,8 @@ public class TestOzoneFileSystem {
 
     final Path aDestinPath = new Path(fs.getUri().toString() + "/a");
 
-    try {
-      fs.rename(abcFile1, aDestinPath);
-      Assert.fail("Should fail as new destination file exists!");
-    } catch (FileAlreadyExistsException faee) {
-      // expected as new sub-path /a/file1 already exists.
-    }
+    Assert.assertFalse("New destin sub-path /b/a already exists",
+            fs.rename(abcFile1, aDestinPath));
   }
 
   /**
@@ -825,12 +836,8 @@ public class TestOzoneFileSystem {
     ContractTestUtils.touch(fs, file1Destin);
     Path abcRootPath = new Path(fs.getUri().toString() + "/a/b/c");
     fs.mkdirs(abcRootPath);
-    try {
-      fs.rename(abcRootPath, file1Destin);
-      Assert.fail("key already exists /root_dir/file1");
-    } catch (FileAlreadyExistsException faee) {
-      // expected
-    }
+    Assert.assertFalse("key already exists /root_dir/file1",
+            fs.rename(abcRootPath, file1Destin));
   }
 
   /**
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneFileSystemV1.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneFileSystemV1.java
index 21e2157..d66ec4d 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneFileSystemV1.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneFileSystemV1.java
@@ -18,7 +18,6 @@
 
 package org.apache.hadoop.fs.ozone;
 
-import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.LocatedFileStatus;
@@ -26,15 +25,8 @@ import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.contract.ContractTestUtils;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
-import org.apache.hadoop.hdds.utils.db.Table;
-import org.apache.hadoop.hdds.utils.db.TableIterator;
-import org.apache.hadoop.hdds.utils.db.cache.CacheKey;
-import org.apache.hadoop.hdds.utils.db.cache.CacheValue;
 import org.apache.hadoop.ozone.om.OMConfigKeys;
-import org.apache.hadoop.ozone.om.OMMetadataManager;
 import org.apache.hadoop.ozone.om.exceptions.OMException;
-import org.apache.hadoop.ozone.om.helpers.OmDirectoryInfo;
-import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
 import org.jetbrains.annotations.NotNull;
 import org.junit.Assert;
 import org.junit.Rule;
@@ -48,11 +40,11 @@ import org.slf4j.LoggerFactory;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.Map;
 
 import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_TRASH_INTERVAL_KEY;
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.fail;
 
 /**
  * Ozone file system tests that are not covered by contract tests,
@@ -309,78 +301,6 @@ public class TestOzoneFileSystemV1 extends TestOzoneFileSystem {
   }
 
   /**
-   * Case-5) If new destin '/dst/source' exists then throws exception.
-   * If destination is a directory then rename source as sub-path of it.
-   * <p>
-   * For example: rename /a to /b will lead to /b/a. This new path should
-   * not exist.
-   */
-  protected void testRenameToNewSubDirShouldNotExist() throws Exception {
-    // Case-5.a) Rename directory from /a to /b.
-    // created /a
-    final Path aSourcePath = new Path(fs.getUri().toString() + "/a");
-    fs.mkdirs(aSourcePath);
-
-    // created /b
-    final Path bDestinPath = new Path(fs.getUri().toString() + "/b");
-    fs.mkdirs(bDestinPath);
-
-    // Add a sub-directory '/b/a' to '/b'. This is to verify that rename
-    // throws exception as new destin /b/a already exists.
-    final Path baPath = new Path(fs.getUri().toString() + "/b/a");
-    fs.mkdirs(baPath);
-
-    try {
-      fs.rename(aSourcePath, bDestinPath);
-      Assert.fail("Should fail as new destination dir exists!");
-    } catch (OMException ome) {
-      // expected as new sub-path /b/a already exists.
-      assertEquals(ome.getResult(), OMException.ResultCodes.KEY_ALREADY_EXISTS);
-    }
-
-    // Case-5.b) Rename file from /a/b/c/file1 to /a.
-    // Should be failed since /a/file1 exists.
-    final Path abcPath = new Path(fs.getUri().toString() + "/a/b/c");
-    fs.mkdirs(abcPath);
-    Path abcFile1 = new Path(abcPath, "/file1");
-    ContractTestUtils.touch(fs, abcFile1);
-
-    final Path aFile1 = new Path(fs.getUri().toString() + "/a/file1");
-    ContractTestUtils.touch(fs, aFile1);
-
-    final Path aDestinPath = new Path(fs.getUri().toString() + "/a");
-
-    try {
-      fs.rename(abcFile1, aDestinPath);
-      Assert.fail("Should fail as new destination file exists!");
-    } catch (OMException ome) {
-      // expected as new sub-path /b/a already exists.
-      assertEquals(ome.getResult(), OMException.ResultCodes.KEY_ALREADY_EXISTS);
-    }
-  }
-
-  /**
-   * Case-6) Rename directory to an existed file, should be failed.
-   */
-  protected void testRenameDirToFile() throws Exception {
-    final String root = "/root";
-    Path rootPath = new Path(fs.getUri().toString() + root);
-    fs.mkdirs(rootPath);
-
-    Path file1Destin = new Path(fs.getUri().toString() + root + "/file1");
-    ContractTestUtils.touch(fs, file1Destin);
-    Path abcRootPath = new Path(fs.getUri().toString() + "/a/b/c");
-    fs.mkdirs(abcRootPath);
-    try {
-      fs.rename(abcRootPath, file1Destin);
-      Assert.fail("key already exists /root_dir/file1");
-    } catch (OMException ome) {
-      // expected
-      assertEquals(ome.getResult(), OMException.ResultCodes.KEY_ALREADY_EXISTS);
-    }
-  }
-
-  /**
    * Fails if the (a) parent of dst does not exist or (b) parent is a file.
    */
   protected void testRenameDestinationParentDoesntExist() throws Exception {
@@ -425,128 +345,90 @@ public class TestOzoneFileSystemV1 extends TestOzoneFileSystem {
     testCreateFileShouldCheckExistenceOfDirWithSameName();
     // TODO: Cleanup keyTable and dirTable explicitly as FS delete operation
     //  is not yet implemented. This should be replaced with fs.delete() call.
-    tableCleanup();
+    deleteRootDir();
     testMakeDirsWithAnExistingDirectoryPath();
-    tableCleanup();
+    deleteRootDir();
     testCreateWithInvalidPaths();
-    tableCleanup();
+    deleteRootDir();
     testListStatusWithoutRecursiveSearch();
-    tableCleanup();
+    deleteRootDir();
     testListFilesRecursive();
-    tableCleanup();
+    deleteRootDir();
 
     testGetDirectoryModificationTime();
-    tableCleanup();
+    deleteRootDir();
 
     testListStatusOnRoot();
-    tableCleanup();
+    deleteRootDir();
     testListStatus();
-    tableCleanup();
+    deleteRootDir();
     testListStatusOnSubDirs();
-    tableCleanup();
+    deleteRootDir();
     testListStatusOnLargeDirectory();
-    tableCleanup();
+    deleteRootDir();
 
     testNonExplicitlyCreatedPathExistsAfterItsLeafsWereRemoved();
-    tableCleanup();
+    deleteRootDir();
 
     testRenameDir();
-    tableCleanup();
+    deleteRootDir();
     testRenameFile();
-    tableCleanup();
+    deleteRootDir();
     testRenameWithNonExistentSource();
-    tableCleanup();
+    deleteRootDir();
     testRenameDirToItsOwnSubDir();
-    tableCleanup();
+    deleteRootDir();
     testRenameSourceAndDestinAreSame();
-    tableCleanup();
+    deleteRootDir();
     testRenameToExistingDir();
-    tableCleanup();
+    deleteRootDir();
     testRenameToNewSubDirShouldNotExist();
-    tableCleanup();
+    deleteRootDir();
     testRenameDirToFile();
-    tableCleanup();
+    deleteRootDir();
     testRenameFileToDir();
-    tableCleanup();
+    deleteRootDir();
     testRenameDestinationParentDoesntExist();
-    tableCleanup();
+    deleteRootDir();
     testRenameToParentDir();
-    tableCleanup();
+    deleteRootDir();
 
     testSeekOnFileLength();
-    tableCleanup();
+    deleteRootDir();
+
+    testFileDelete();
+    deleteRootDir();
+
+    testDeleteRoot();
+    deleteRootDir();
+
+    testRecursiveDelete();
+    deleteRootDir();
   }
 
   /**
-   * Cleanup keyTable and directoryTable explicitly as FS delete operation
-   * is not yet supported.
+   * Cleanup files and directories.
    *
    * @throws IOException DB failure
    */
-  protected void tableCleanup() throws IOException {
-    OMMetadataManager metadataMgr = cluster.getOzoneManager()
-            .getMetadataManager();
-    TableIterator<String, ? extends
-            Table.KeyValue<String, OmDirectoryInfo>> dirTableIterator =
-            metadataMgr.getDirectoryTable().iterator();
-    dirTableIterator.seekToFirst();
-    ArrayList <String> dirList = new ArrayList<>();
-    while (dirTableIterator.hasNext()) {
-      String key = dirTableIterator.key();
-      if (StringUtils.isNotBlank(key)) {
-        dirList.add(key);
-      }
-      dirTableIterator.next();
-    }
-
-    Iterator<Map.Entry<CacheKey<String>, CacheValue<OmDirectoryInfo>>>
-            cacheIterator = metadataMgr.getDirectoryTable().cacheIterator();
-    while(cacheIterator.hasNext()){
-      cacheIterator.next();
-      cacheIterator.remove();
-    }
+  protected void deleteRootDir() throws IOException {
+    Path root = new Path("/");
+    FileStatus[] fileStatuses = fs.listStatus(root);
 
-    for (String dirKey : dirList) {
-      metadataMgr.getDirectoryTable().delete(dirKey);
-      Assert.assertNull("Unexpected entry!",
-              metadataMgr.getDirectoryTable().get(dirKey));
+    if (fileStatuses == null) {
+      return;
     }
 
-    Assert.assertTrue("DirTable is not empty",
-            metadataMgr.getDirectoryTable().isEmpty());
-
-    Assert.assertFalse(metadataMgr.getDirectoryTable().cacheIterator()
-            .hasNext());
-
-    TableIterator<String, ? extends
-            Table.KeyValue<String, OmKeyInfo>> keyTableIterator =
-            metadataMgr.getKeyTable().iterator();
-    keyTableIterator.seekToFirst();
-    ArrayList <String> fileList = new ArrayList<>();
-    while (keyTableIterator.hasNext()) {
-      String key = keyTableIterator.key();
-      if (StringUtils.isNotBlank(key)) {
-        fileList.add(key);
-      }
-      keyTableIterator.next();
+    for (FileStatus fStatus : fileStatuses) {
+      fs.delete(fStatus.getPath(), true);
     }
 
-    Iterator<Map.Entry<CacheKey<String>, CacheValue<OmKeyInfo>>>
-            keyCacheIterator = metadataMgr.getKeyTable().cacheIterator();
-    while(keyCacheIterator.hasNext()){
-      keyCacheIterator.next();
-      keyCacheIterator.remove();
+    fileStatuses = fs.listStatus(root);
+    if (fileStatuses != null) {
+      Assert.assertEquals("Delete root failed!", 0, fileStatuses.length);
+      rootItemCount = 0;
+      return;
     }
-
-    for (String fileKey : fileList) {
-      metadataMgr.getKeyTable().delete(fileKey);
-      Assert.assertNull("Unexpected entry!",
-              metadataMgr.getKeyTable().get(fileKey));
-    }
-
-    Assert.assertTrue("KeyTable is not empty",
-            metadataMgr.getKeyTable().isEmpty());
-
     rootItemCount = 0;
   }
 
diff --git a/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto b/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto
index 59843d3..45e969d 100644
--- a/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto
+++ b/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto
@@ -320,6 +320,7 @@ enum Status {
 
     QUOTA_ERROR = 67;
 
+    DIRECTORY_NOT_EMPTY = 68;
 }
 
 /**
@@ -730,6 +731,9 @@ message KeyArgs {
 
     // This will be set by leader OM in HA and update the original request.
     optional FileEncryptionInfoProto fileEncryptionInfo = 15;
+
+    // This will be set when user performs delete directory recursively.
+    optional bool recursive = 16;
 }
 
 message KeyLocation {
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
index 6bb8846..348db50 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
@@ -2454,7 +2454,8 @@ public class KeyManagerImpl implements KeyManager {
 
     while (iterator.hasNext() && numEntries - countEntries > 0) {
       OmDirectoryInfo dirInfo = iterator.value().getValue();
-      if (!isImmediateChild(dirInfo.getParentObjectID(), prefixKeyInDB)) {
+      if (!OMFileRequest.isImmediateChild(dirInfo.getParentObjectID(),
+              prefixKeyInDB)) {
         break;
       }
 
@@ -2489,7 +2490,8 @@ public class KeyManagerImpl implements KeyManager {
     while (iterator.hasNext() && numEntries - countEntries > 0) {
       OmKeyInfo keyInfo = iterator.value().getValue();
 
-      if (!isImmediateChild(keyInfo.getParentObjectID(), prefixKeyInDB)) {
+      if (!OMFileRequest.isImmediateChild(keyInfo.getParentObjectID(),
+              prefixKeyInDB)) {
         break;
       }
 
@@ -2504,10 +2506,6 @@ public class KeyManagerImpl implements KeyManager {
     return countEntries;
   }
 
-  private boolean isImmediateChild(long parentId, long ancestorId) {
-    return parentId == ancestorId;
-  }
-
   /**
    * Helper function for listStatus to find key in FileTableCache.
    */
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/utils/OzoneManagerRatisUtils.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/utils/OzoneManagerRatisUtils.java
index d2dd5c7..7b5988c 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/utils/OzoneManagerRatisUtils.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/utils/OzoneManagerRatisUtils.java
@@ -42,6 +42,7 @@ import org.apache.hadoop.ozone.om.request.key.OMKeyCommitRequest;
 import org.apache.hadoop.ozone.om.request.key.OMKeyCommitRequestV1;
 import org.apache.hadoop.ozone.om.request.key.OMKeyCreateRequest;
 import org.apache.hadoop.ozone.om.request.key.OMKeyDeleteRequest;
+import org.apache.hadoop.ozone.om.request.key.OMKeyDeleteRequestV1;
 import org.apache.hadoop.ozone.om.request.key.OMKeyPurgeRequest;
 import org.apache.hadoop.ozone.om.request.key.OMKeyRenameRequest;
 import org.apache.hadoop.ozone.om.request.key.OMKeyRenameRequestV1;
@@ -146,6 +147,9 @@ public final class OzoneManagerRatisUtils {
       }
       return new OMKeyCommitRequest(omRequest);
     case DeleteKey:
+      if (omLayoutVersionV1) {
+        return new OMKeyDeleteRequestV1(omRequest);
+      }
       return new OMKeyDeleteRequest(omRequest);
     case DeleteKeys:
       return new OMKeysDeleteRequest(omRequest);
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/file/OMFileRequest.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/file/OMFileRequest.java
index e7b43d6..fc9bab0 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/file/OMFileRequest.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/file/OMFileRequest.java
@@ -25,11 +25,14 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 
 import com.google.common.base.Optional;
 import com.google.common.base.Strings;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.utils.db.BatchOperation;
+import org.apache.hadoop.hdds.utils.db.Table;
+import org.apache.hadoop.hdds.utils.db.TableIterator;
 import org.apache.hadoop.hdds.utils.db.cache.CacheKey;
 import org.apache.hadoop.hdds.utils.db.cache.CacheValue;
 import org.apache.hadoop.ozone.OzoneAcl;
@@ -759,4 +762,93 @@ public final class OMFileRequest {
     }
     return toKeyParentDirStatus.getKeyInfo().getObjectID();
   }
+
+  /**
+   * Check if there are any sub path exist for the given user key path.
+   *
+   * @param omKeyInfo om key path
+   * @param metaMgr   OMMetadataManager
+   * @return true if there are any sub path, false otherwise
+   * @throws IOException DB exception
+   */
+  public static boolean hasChildren(OmKeyInfo omKeyInfo,
+      OMMetadataManager metaMgr) throws IOException {
+    return checkSubDirectoryExists(omKeyInfo, metaMgr) ||
+            checkSubFileExists(omKeyInfo, metaMgr);
+  }
+
+  private static boolean checkSubDirectoryExists(OmKeyInfo omKeyInfo,
+      OMMetadataManager metaMgr) throws IOException {
+    // Check all dirTable cache for any sub paths.
+    Table dirTable = metaMgr.getDirectoryTable();
+    Iterator<Map.Entry<CacheKey<String>, CacheValue<OmDirectoryInfo>>>
+            cacheIter = dirTable.cacheIterator();
+
+    while (cacheIter.hasNext()) {
+      Map.Entry<CacheKey<String>, CacheValue<OmDirectoryInfo>> entry =
+              cacheIter.next();
+      OmDirectoryInfo cacheOmDirInfo = entry.getValue().getCacheValue();
+      if (cacheOmDirInfo == null) {
+        continue;
+      }
+      if (isImmediateChild(cacheOmDirInfo.getParentObjectID(),
+              omKeyInfo.getObjectID())) {
+        return true; // found a sub path directory
+      }
+    }
+
+    // Check dirTable entries for any sub paths.
+    String seekDirInDB = metaMgr.getOzonePathKey(omKeyInfo.getObjectID(), "");
+    TableIterator<String, ? extends Table.KeyValue<String, OmDirectoryInfo>>
+            iterator = dirTable.iterator();
+
+    iterator.seek(seekDirInDB);
+
+    if (iterator.hasNext()) {
+      OmDirectoryInfo dirInfo = iterator.value().getValue();
+      return isImmediateChild(dirInfo.getParentObjectID(),
+              omKeyInfo.getObjectID());
+    }
+    return false; // no sub paths found
+  }
+
+  private static boolean checkSubFileExists(OmKeyInfo omKeyInfo,
+      OMMetadataManager metaMgr) throws IOException {
+    // Check all fileTable cache for any sub paths.
+    Table fileTable = metaMgr.getKeyTable();
+    Iterator<Map.Entry<CacheKey<String>, CacheValue<OmKeyInfo>>>
+            cacheIter = fileTable.cacheIterator();
+
+    while (cacheIter.hasNext()) {
+      Map.Entry<CacheKey<String>, CacheValue<OmKeyInfo>> entry =
+              cacheIter.next();
+      OmKeyInfo cacheOmFileInfo = entry.getValue().getCacheValue();
+      if (cacheOmFileInfo == null) {
+        continue;
+      }
+      if (isImmediateChild(cacheOmFileInfo.getParentObjectID(),
+              omKeyInfo.getObjectID())) {
+        return true; // found a sub path file
+      }
+    }
+
+    // Check fileTable entries for any sub paths.
+    String seekFileInDB = metaMgr.getOzonePathKey(
+            omKeyInfo.getObjectID(), "");
+    TableIterator<String, ? extends Table.KeyValue<String, OmKeyInfo>>
+            iterator = fileTable.iterator();
+
+    iterator.seek(seekFileInDB);
+
+    if (iterator.hasNext()) {
+      OmKeyInfo fileInfo = iterator.value().getValue();
+      return isImmediateChild(fileInfo.getParentObjectID(),
+              omKeyInfo.getObjectID()); // found a sub path file
+    }
+    return false; // no sub paths found
+  }
+
+  public static boolean isImmediateChild(long parentId, long ancestorId) {
+    return parentId == ancestorId;
+  }
 }
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyDeleteRequestV1.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyDeleteRequestV1.java
new file mode 100644
index 0000000..93531bc
--- /dev/null
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyDeleteRequestV1.java
@@ -0,0 +1,202 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.om.request.key;
+
+import com.google.common.base.Optional;
+import org.apache.hadoop.hdds.utils.db.cache.CacheKey;
+import org.apache.hadoop.hdds.utils.db.cache.CacheValue;
+import org.apache.hadoop.ozone.audit.AuditLogger;
+import org.apache.hadoop.ozone.audit.OMAction;
+import org.apache.hadoop.ozone.om.OMMetadataManager;
+import org.apache.hadoop.ozone.om.OMMetrics;
+import org.apache.hadoop.ozone.om.OzoneManager;
+import org.apache.hadoop.ozone.om.exceptions.OMException;
+import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
+import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
+import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
+import org.apache.hadoop.ozone.om.helpers.OzoneFileStatus;
+import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerDoubleBufferHelper;
+import org.apache.hadoop.ozone.om.request.file.OMFileRequest;
+import org.apache.hadoop.ozone.om.request.util.OmResponseUtil;
+import org.apache.hadoop.ozone.om.response.OMClientResponse;
+import org.apache.hadoop.ozone.om.response.key.OMKeyDeleteResponseV1;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.DeleteKeyRequest;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.DeleteKeyResponse;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse;
+import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer;
+import org.apache.hadoop.ozone.security.acl.OzoneObj;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Map;
+
+import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.DIRECTORY_NOT_EMPTY;
+import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.KEY_NOT_FOUND;
+import static org.apache.hadoop.ozone.om.lock.OzoneManagerLock.Resource.BUCKET_LOCK;
+
+/**
+ * Handles DeleteKey request layout version V1.
+ */
+public class OMKeyDeleteRequestV1 extends OMKeyDeleteRequest {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(OMKeyDeleteRequestV1.class);
+
+  public OMKeyDeleteRequestV1(OMRequest omRequest) {
+    super(omRequest);
+  }
+
+  @Override
+  @SuppressWarnings("methodlength")
+  public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager,
+      long trxnLogIndex, OzoneManagerDoubleBufferHelper omDoubleBufferHelper) {
+    DeleteKeyRequest deleteKeyRequest = getOmRequest().getDeleteKeyRequest();
+
+    OzoneManagerProtocolProtos.KeyArgs keyArgs =
+        deleteKeyRequest.getKeyArgs();
+    Map<String, String> auditMap = buildKeyArgsAuditMap(keyArgs);
+
+    String volumeName = keyArgs.getVolumeName();
+    String bucketName = keyArgs.getBucketName();
+    String keyName = keyArgs.getKeyName();
+    boolean recursive = keyArgs.getRecursive();
+
+    OMMetrics omMetrics = ozoneManager.getMetrics();
+    omMetrics.incNumKeyDeletes();
+
+    AuditLogger auditLogger = ozoneManager.getAuditLogger();
+    OzoneManagerProtocolProtos.UserInfo userInfo = getOmRequest().getUserInfo();
+
+    OMResponse.Builder omResponse = OmResponseUtil.getOMResponseBuilder(
+        getOmRequest());
+    OMMetadataManager omMetadataManager = ozoneManager.getMetadataManager();
+    IOException exception = null;
+    boolean acquiredLock = false;
+    OMClientResponse omClientResponse = null;
+    Result result = null;
+    OmVolumeArgs omVolumeArgs = null;
+    OmBucketInfo omBucketInfo = null;
+    try {
+      keyArgs = resolveBucketLink(ozoneManager, keyArgs, auditMap);
+      volumeName = keyArgs.getVolumeName();
+      bucketName = keyArgs.getBucketName();
+
+      // check Acl
+      checkKeyAcls(ozoneManager, volumeName, bucketName, keyName,
+          IAccessAuthorizer.ACLType.DELETE, OzoneObj.ResourceType.KEY);
+
+      acquiredLock = omMetadataManager.getLock().acquireWriteLock(BUCKET_LOCK,
+          volumeName, bucketName);
+
+      // Validate bucket and volume exists or not.
+      validateBucketAndVolume(omMetadataManager, volumeName, bucketName);
+
+      OzoneFileStatus keyStatus =
+              OMFileRequest.getOMKeyInfoIfExists(omMetadataManager, volumeName,
+                      bucketName, keyName, 0);
+
+      if (keyStatus == null) {
+        throw new OMException("Key not found. Key:" + keyName, KEY_NOT_FOUND);
+      }
+
+      OmKeyInfo omKeyInfo = keyStatus.getKeyInfo();
+
+      // Set the UpdateID to current transactionLogIndex
+      omKeyInfo.setUpdateID(trxnLogIndex, ozoneManager.isRatisEnabled());
+
+      String ozonePathKey = omMetadataManager.getOzonePathKey(
+              omKeyInfo.getParentObjectID(), omKeyInfo.getFileName());
+
+      if (keyStatus.isDirectory()) {
+        // Check if there are any sub path exists under the user requested path
+        if (!recursive && OMFileRequest.hasChildren(omKeyInfo,
+                omMetadataManager)) {
+          throw new OMException("Directory is not empty. Key:" + keyName,
+                  DIRECTORY_NOT_EMPTY);
+        }
+
+        // Update dir cache.
+        omMetadataManager.getDirectoryTable().addCacheEntry(
+                new CacheKey<>(ozonePathKey),
+                new CacheValue<>(Optional.absent(), trxnLogIndex));
+      } else {
+        // Update table cache.
+        omMetadataManager.getKeyTable().addCacheEntry(
+                new CacheKey<>(ozonePathKey),
+                new CacheValue<>(Optional.absent(), trxnLogIndex));
+      }
+
+      omBucketInfo = getBucketInfo(omMetadataManager, volumeName, bucketName);
+
+      long quotaReleased = sumBlockLengths(omKeyInfo);
+      omBucketInfo.incrUsedBytes(-quotaReleased);
+      omBucketInfo.incrUsedNamespace(-1L);
+
+      // No need to add cache entries to delete table. As delete table will
+      // be used by DeleteKeyService only, not used for any client response
+      // validation, so we don't need to add to cache.
+      // TODO: Revisit if we need it later.
+
+      omClientResponse = new OMKeyDeleteResponseV1(omResponse
+          .setDeleteKeyResponse(DeleteKeyResponse.newBuilder()).build(),
+          omKeyInfo, ozoneManager.isRatisEnabled(),
+          omBucketInfo, keyStatus.isDirectory());
+
+      result = Result.SUCCESS;
+    } catch (IOException ex) {
+      result = Result.FAILURE;
+      exception = ex;
+      omClientResponse = new OMKeyDeleteResponseV1(
+          createErrorOMResponse(omResponse, exception));
+    } finally {
+      addResponseToDoubleBuffer(trxnLogIndex, omClientResponse,
+            omDoubleBufferHelper);
+      if (acquiredLock) {
+        omMetadataManager.getLock().releaseWriteLock(BUCKET_LOCK, volumeName,
+            bucketName);
+      }
+    }
+
+    // Performing audit logging outside of the lock.
+    auditLog(auditLogger, buildAuditMessage(OMAction.DELETE_KEY, auditMap,
+        exception, userInfo));
+
+
+    switch (result) {
+    case SUCCESS:
+      omMetrics.decNumKeys();
+      LOG.debug("Key deleted. Volume:{}, Bucket:{}, Key:{}", volumeName,
+          bucketName, keyName);
+      break;
+    case FAILURE:
+      omMetrics.incNumKeyDeleteFails();
+      LOG.error("Key delete failed. Volume:{}, Bucket:{}, Key:{}.",
+          volumeName, bucketName, keyName, exception);
+      break;
+    default:
+      LOG.error("Unrecognized Result for OMKeyDeleteRequest: {}",
+          deleteKeyRequest);
+    }
+
+    return omClientResponse;
+  }
+}
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyRenameRequestV1.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyRenameRequestV1.java
index 74e53fe..ba022c5 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyRenameRequestV1.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyRenameRequestV1.java
@@ -90,7 +90,6 @@ public class OMKeyRenameRequestV1 extends OMKeyRenameRequest {
     OMClientResponse omClientResponse = null;
     IOException exception = null;
     OmKeyInfo fromKeyValue;
-    String fromKey = null;
     Result result;
     try {
       if (toKeyName.length() == 0 || fromKeyName.length() == 0) {
@@ -122,7 +121,7 @@ public class OMKeyRenameRequestV1 extends OMKeyRenameRequest {
       // case-1) fromKeyName should exist, otw throws exception
       if (fromKeyFileStatus == null) {
         // TODO: Add support for renaming open key
-        throw new OMException("Key not found " + fromKey, KEY_NOT_FOUND);
+        throw new OMException("Key not found " + fromKeyName, KEY_NOT_FOUND);
       }
 
       // source existed
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/OMKeyDeleteResponse.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/OMKeyDeleteResponse.java
index 58785c0..868d8c9 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/OMKeyDeleteResponse.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/OMKeyDeleteResponse.java
@@ -75,4 +75,12 @@ public class OMKeyDeleteResponse extends AbstractOMKeyDeleteResponse {
         omMetadataManager.getBucketKey(omBucketInfo.getVolumeName(),
             omBucketInfo.getBucketName()), omBucketInfo);
   }
+
+  protected OmKeyInfo getOmKeyInfo() {
+    return omKeyInfo;
+  }
+
+  protected OmBucketInfo getOmBucketInfo() {
+    return omBucketInfo;
+  }
 }
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/OMKeyDeleteResponse.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/OMKeyDeleteResponseV1.java
similarity index 60%
copy from hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/OMKeyDeleteResponse.java
copy to hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/OMKeyDeleteResponseV1.java
index 58785c0..15c1ba6 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/OMKeyDeleteResponse.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/OMKeyDeleteResponseV1.java
@@ -18,43 +18,42 @@
 
 package org.apache.hadoop.ozone.om.response.key;
 
+import org.apache.hadoop.hdds.utils.db.BatchOperation;
 import org.apache.hadoop.hdds.utils.db.Table;
 import org.apache.hadoop.ozone.om.OMMetadataManager;
 import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
 import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
 import org.apache.hadoop.ozone.om.response.CleanupTableInfo;
-import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
-    .OMResponse;
-import org.apache.hadoop.hdds.utils.db.BatchOperation;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse;
 
-import java.io.IOException;
 import javax.annotation.Nonnull;
+import java.io.IOException;
 
 import static org.apache.hadoop.ozone.om.OmMetadataManagerImpl.DELETED_TABLE;
-import static org.apache.hadoop.ozone.om.OmMetadataManagerImpl.KEY_TABLE;
+import static org.apache.hadoop.ozone.om.OmMetadataManagerImpl.DIRECTORY_TABLE;
+import static org.apache.hadoop.ozone.om.OmMetadataManagerImpl.FILE_TABLE;
 
 /**
  * Response for DeleteKey request.
  */
-@CleanupTableInfo(cleanupTables = {KEY_TABLE, DELETED_TABLE})
-public class OMKeyDeleteResponse extends AbstractOMKeyDeleteResponse {
+@CleanupTableInfo(cleanupTables = {FILE_TABLE, DIRECTORY_TABLE, DELETED_TABLE})
+public class OMKeyDeleteResponseV1 extends OMKeyDeleteResponse {
 
-  private OmKeyInfo omKeyInfo;
-  private OmBucketInfo omBucketInfo;
+  private boolean isDeleteDirectory;
 
-  public OMKeyDeleteResponse(@Nonnull OMResponse omResponse,
+  public OMKeyDeleteResponseV1(@Nonnull OMResponse omResponse,
       @Nonnull OmKeyInfo omKeyInfo, boolean isRatisEnabled,
-      @Nonnull OmBucketInfo omBucketInfo) {
-    super(omResponse, isRatisEnabled);
-    this.omKeyInfo = omKeyInfo;
-    this.omBucketInfo = omBucketInfo;
+      @Nonnull OmBucketInfo omBucketInfo,
+      @Nonnull boolean isDeleteDirectory) {
+    super(omResponse, omKeyInfo, isRatisEnabled, omBucketInfo);
+    this.isDeleteDirectory = isDeleteDirectory;
   }
 
   /**
    * For when the request is not successful.
    * For a successful request, the other constructor should be used.
    */
-  public OMKeyDeleteResponse(@Nonnull OMResponse omResponse) {
+  public OMKeyDeleteResponseV1(@Nonnull OMResponse omResponse) {
     super(omResponse);
   }
 
@@ -64,15 +63,21 @@ public class OMKeyDeleteResponse extends AbstractOMKeyDeleteResponse {
 
     // For OmResponse with failure, this should do nothing. This method is
     // not called in failure scenario in OM code.
-    String ozoneKey = omMetadataManager.getOzoneKey(omKeyInfo.getVolumeName(),
-        omKeyInfo.getBucketName(), omKeyInfo.getKeyName());
-    Table<String, OmKeyInfo> keyTable = omMetadataManager.getKeyTable();
-    addDeletionToBatch(omMetadataManager, batchOperation, keyTable, ozoneKey,
-        omKeyInfo);
+    String ozoneDbKey = omMetadataManager.getOzonePathKey(
+            getOmKeyInfo().getParentObjectID(), getOmKeyInfo().getFileName());
+
+    if (isDeleteDirectory) {
+      omMetadataManager.getDirectoryTable().deleteWithBatch(batchOperation,
+              ozoneDbKey);
+    } else {
+      Table<String, OmKeyInfo> keyTable = omMetadataManager.getKeyTable();
+      addDeletionToBatch(omMetadataManager, batchOperation, keyTable,
+              ozoneDbKey, getOmKeyInfo());
+    }
 
     // update bucket usedBytes.
     omMetadataManager.getBucketTable().putWithBatch(batchOperation,
-        omMetadataManager.getBucketKey(omBucketInfo.getVolumeName(),
-            omBucketInfo.getBucketName()), omBucketInfo);
+            omMetadataManager.getBucketKey(getOmBucketInfo().getVolumeName(),
+                    getOmBucketInfo().getBucketName()), getOmBucketInfo());
   }
 }
diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/TestOMRequestUtils.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/TestOMRequestUtils.java
index 2d4b5cb..61fd676 100644
--- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/TestOMRequestUtils.java
+++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/TestOMRequestUtils.java
@@ -910,6 +910,9 @@ public final class TestOMRequestUtils {
           throws Exception {
     long bucketId = TestOMRequestUtils.getBucketId(volumeName, bucketName,
             omMetaMgr);
+    if (org.apache.commons.lang3.StringUtils.isBlank(key)) {
+      return bucketId;
+    }
     String[] pathComponents = StringUtils.split(key, '/');
     long objectId = bucketId + 10;
     long parentId = bucketId;
diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyDeleteRequest.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyDeleteRequest.java
index b8e5603..b5af354 100644
--- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyDeleteRequest.java
+++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyDeleteRequest.java
@@ -46,27 +46,23 @@ public class TestOMKeyDeleteRequest extends TestOMKeyRequest {
 
   @Test
   public void testValidateAndUpdateCache() throws Exception {
-    OMRequest modifiedOmRequest =
-        doPreExecute(createDeleteKeyRequest());
-
-    OMKeyDeleteRequest omKeyDeleteRequest =
-        new OMKeyDeleteRequest(modifiedOmRequest);
-
     // Add volume, bucket and key entries to OM DB.
     TestOMRequestUtils.addVolumeAndBucketToDB(volumeName, bucketName,
         omMetadataManager);
 
-    TestOMRequestUtils.addKeyToTable(false, volumeName, bucketName, keyName,
-        clientID, replicationType, replicationFactor, omMetadataManager);
-
-    String ozoneKey = omMetadataManager.getOzoneKey(volumeName, bucketName,
-        keyName);
+    String ozoneKey = addKeyToTable();
 
     OmKeyInfo omKeyInfo = omMetadataManager.getKeyTable().get(ozoneKey);
 
     // As we added manually to key table.
     Assert.assertNotNull(omKeyInfo);
 
+    OMRequest modifiedOmRequest =
+            doPreExecute(createDeleteKeyRequest());
+
+    OMKeyDeleteRequest omKeyDeleteRequest =
+            getOmKeyDeleteRequest(modifiedOmRequest);
+
     OMClientResponse omClientResponse =
         omKeyDeleteRequest.validateAndUpdateCache(ozoneManager,
         100L, ozoneManagerDoubleBufferHelper);
@@ -86,7 +82,7 @@ public class TestOMKeyDeleteRequest extends TestOMKeyRequest {
         doPreExecute(createDeleteKeyRequest());
 
     OMKeyDeleteRequest omKeyDeleteRequest =
-        new OMKeyDeleteRequest(modifiedOmRequest);
+            getOmKeyDeleteRequest(modifiedOmRequest);
 
     // Add only volume and bucket entry to DB.
     // In actual implementation we don't check for bucket/volume exists
@@ -108,7 +104,7 @@ public class TestOMKeyDeleteRequest extends TestOMKeyRequest {
         doPreExecute(createDeleteKeyRequest());
 
     OMKeyDeleteRequest omKeyDeleteRequest =
-        new OMKeyDeleteRequest(modifiedOmRequest);
+            getOmKeyDeleteRequest(modifiedOmRequest);
 
     OMClientResponse omClientResponse =
         omKeyDeleteRequest.validateAndUpdateCache(ozoneManager,
@@ -124,7 +120,7 @@ public class TestOMKeyDeleteRequest extends TestOMKeyRequest {
         doPreExecute(createDeleteKeyRequest());
 
     OMKeyDeleteRequest omKeyDeleteRequest =
-        new OMKeyDeleteRequest(modifiedOmRequest);
+            getOmKeyDeleteRequest(modifiedOmRequest);
 
     TestOMRequestUtils.addVolumeToDB(volumeName, omMetadataManager);
 
@@ -145,7 +141,7 @@ public class TestOMKeyDeleteRequest extends TestOMKeyRequest {
   private OMRequest doPreExecute(OMRequest originalOmRequest) throws Exception {
 
     OMKeyDeleteRequest omKeyDeleteRequest =
-        new OMKeyDeleteRequest(originalOmRequest);
+            getOmKeyDeleteRequest(originalOmRequest);
 
     OMRequest modifiedOmRequest = omKeyDeleteRequest.preExecute(ozoneManager);
 
@@ -170,4 +166,18 @@ public class TestOMKeyDeleteRequest extends TestOMKeyRequest {
         .setCmdType(OzoneManagerProtocolProtos.Type.DeleteKey)
         .setClientId(UUID.randomUUID().toString()).build();
   }
+
+  protected String addKeyToTable() throws Exception {
+    TestOMRequestUtils.addKeyToTable(false, volumeName,
+            bucketName, keyName, clientID, replicationType, replicationFactor,
+            omMetadataManager);
+
+    return omMetadataManager.getOzoneKey(volumeName, bucketName,
+            keyName);
+  }
+
+  protected OMKeyDeleteRequest getOmKeyDeleteRequest(
+      OMRequest modifiedOmRequest) {
+    return new OMKeyDeleteRequest(modifiedOmRequest);
+  }
 }
diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyDeleteRequestV1.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyDeleteRequestV1.java
new file mode 100644
index 0000000..dbba143
--- /dev/null
+++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyDeleteRequestV1.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.om.request.key;
+
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
+import org.apache.hadoop.ozone.om.request.TestOMRequestUtils;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
+import org.apache.hadoop.util.Time;
+
+/**
+ * Tests OmKeyDelete request layout version V1.
+ */
+public class TestOMKeyDeleteRequestV1 extends TestOMKeyDeleteRequest {
+
+  protected OMKeyDeleteRequest getOmKeyDeleteRequest(
+      OMRequest modifiedOmRequest) {
+    return new OMKeyDeleteRequestV1(modifiedOmRequest);
+  }
+
+  protected String addKeyToTable() throws Exception {
+    String parentDir = "c/d/e";
+    String fileName = "file1";
+    String key = parentDir + "/" + fileName;
+    keyName = key; // updated key name
+
+    // Create parent dirs for the path
+    long parentId = TestOMRequestUtils.addParentsToDirTable(volumeName,
+            bucketName, parentDir, omMetadataManager);
+
+    OmKeyInfo omKeyInfo =
+            TestOMRequestUtils.createOmKeyInfo(volumeName, bucketName, key,
+                    HddsProtos.ReplicationType.RATIS,
+                    HddsProtos.ReplicationFactor.ONE,
+                    parentId + 1,
+                    parentId, 100, Time.now());
+    TestOMRequestUtils.addFileToKeyTable(false, false,
+            fileName, omKeyInfo, -1, 50, omMetadataManager);
+    return omKeyInfo.getPath();
+  }
+}
diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/key/TestOMKeyDeleteResponse.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/key/TestOMKeyDeleteResponse.java
index 871e39f..a56b91e 100644
--- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/key/TestOMKeyDeleteResponse.java
+++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/key/TestOMKeyDeleteResponse.java
@@ -40,17 +40,18 @@ import java.util.List;
  */
 public class TestOMKeyDeleteResponse extends TestOMKeyResponse {
 
+  private OmBucketInfo omBucketInfo;
+
   @Test
   public void testAddToDBBatch() throws Exception {
-
-    OmKeyInfo omKeyInfo = TestOMRequestUtils.createOmKeyInfo(volumeName,
-        bucketName, keyName, replicationType, replicationFactor);
     OmVolumeArgs omVolumeArgs = OmVolumeArgs.newBuilder()
-        .setOwnerName(keyName).setAdminName(keyName)
-        .setVolume(volumeName).setCreationTime(Time.now()).build();
-    OmBucketInfo omBucketInfo = OmBucketInfo.newBuilder()
-        .setVolumeName(volumeName).setBucketName(bucketName)
-        .setCreationTime(Time.now()).build();
+            .setOwnerName(keyName).setAdminName(keyName)
+            .setVolume(volumeName).setCreationTime(Time.now()).build();
+    omBucketInfo = OmBucketInfo.newBuilder()
+            .setVolumeName(volumeName).setBucketName(bucketName)
+            .setCreationTime(Time.now()).build();
+
+    OmKeyInfo omKeyInfo = getOmKeyInfo();
 
     OzoneManagerProtocolProtos.OMResponse omResponse =
         OzoneManagerProtocolProtos.OMResponse.newBuilder().setDeleteKeyResponse(
@@ -59,14 +60,10 @@ public class TestOMKeyDeleteResponse extends TestOMKeyResponse {
             .setCmdType(OzoneManagerProtocolProtos.Type.DeleteKey)
             .build();
 
-    OMKeyDeleteResponse omKeyDeleteResponse = new OMKeyDeleteResponse(
-        omResponse, omKeyInfo, true, omBucketInfo);
+    OMKeyDeleteResponse omKeyDeleteResponse = getOmKeyDeleteResponse(omKeyInfo,
+            omResponse);
 
-    String ozoneKey = omMetadataManager.getOzoneKey(volumeName, bucketName,
-        keyName);
-
-    TestOMRequestUtils.addKeyToTable(false, volumeName, bucketName, keyName,
-        clientID, replicationType, replicationFactor, omMetadataManager);
+    String ozoneKey = addKeyToTable();
 
     Assert.assertTrue(omMetadataManager.getKeyTable().isExist(ozoneKey));
     omKeyDeleteResponse.addToDBBatch(omMetadataManager, batchOperation);
@@ -84,15 +81,14 @@ public class TestOMKeyDeleteResponse extends TestOMKeyResponse {
 
   @Test
   public void testAddToDBBatchWithNonEmptyBlocks() throws Exception {
-
-    OmKeyInfo omKeyInfo = TestOMRequestUtils.createOmKeyInfo(volumeName,
-        bucketName, keyName, replicationType, replicationFactor);
     OmVolumeArgs omVolumeArgs = OmVolumeArgs.newBuilder()
-        .setOwnerName(keyName).setAdminName(keyName)
-        .setVolume(volumeName).setCreationTime(Time.now()).build();
-    OmBucketInfo omBucketInfo = OmBucketInfo.newBuilder()
-        .setVolumeName(volumeName).setBucketName(bucketName)
-        .setCreationTime(Time.now()).build();
+            .setOwnerName(keyName).setAdminName(keyName)
+            .setVolume(volumeName).setCreationTime(Time.now()).build();
+    omBucketInfo = OmBucketInfo.newBuilder()
+            .setVolumeName(volumeName).setBucketName(bucketName)
+            .setCreationTime(Time.now()).build();
+
+    OmKeyInfo omKeyInfo = getOmKeyInfo();
 
     // Add block to key.
     List<OmKeyLocationInfo> omKeyLocationInfoList = new ArrayList<>();
@@ -115,10 +111,7 @@ public class TestOMKeyDeleteResponse extends TestOMKeyResponse {
 
     omKeyInfo.appendNewBlocks(omKeyLocationInfoList, false);
 
-    String ozoneKey = omMetadataManager.getOzoneKey(volumeName, bucketName,
-        keyName);
-
-    omMetadataManager.getKeyTable().put(ozoneKey, omKeyInfo);
+    String ozoneKey = addKeyToTable();
 
     OzoneManagerProtocolProtos.OMResponse omResponse =
         OzoneManagerProtocolProtos.OMResponse.newBuilder().setDeleteKeyResponse(
@@ -127,8 +120,8 @@ public class TestOMKeyDeleteResponse extends TestOMKeyResponse {
             .setCmdType(OzoneManagerProtocolProtos.Type.DeleteKey)
             .build();
 
-    OMKeyDeleteResponse omKeyDeleteResponse = new OMKeyDeleteResponse(
-        omResponse, omKeyInfo, true, omBucketInfo);
+    OMKeyDeleteResponse omKeyDeleteResponse = getOmKeyDeleteResponse(omKeyInfo,
+            omResponse);
 
     Assert.assertTrue(omMetadataManager.getKeyTable().isExist(ozoneKey));
     omKeyDeleteResponse.addToDBBatch(omMetadataManager, batchOperation);
@@ -146,12 +139,10 @@ public class TestOMKeyDeleteResponse extends TestOMKeyResponse {
 
   @Test
   public void testAddToDBBatchWithErrorResponse() throws Exception {
-    OmKeyInfo omKeyInfo = TestOMRequestUtils.createOmKeyInfo(volumeName,
-        bucketName, keyName, replicationType, replicationFactor);
-
-    OmBucketInfo omBucketInfo = OmBucketInfo.newBuilder()
-        .setVolumeName(volumeName).setBucketName(bucketName)
-        .setCreationTime(Time.now()).build();
+    omBucketInfo = OmBucketInfo.newBuilder()
+            .setVolumeName(volumeName).setBucketName(bucketName)
+            .setCreationTime(Time.now()).build();
+    OmKeyInfo omKeyInfo = getOmKeyInfo();
 
     OzoneManagerProtocolProtos.OMResponse omResponse =
         OzoneManagerProtocolProtos.OMResponse.newBuilder().setDeleteKeyResponse(
@@ -160,14 +151,10 @@ public class TestOMKeyDeleteResponse extends TestOMKeyResponse {
             .setCmdType(OzoneManagerProtocolProtos.Type.DeleteKey)
             .build();
 
-    OMKeyDeleteResponse omKeyDeleteResponse = new OMKeyDeleteResponse(
-        omResponse, omKeyInfo, true, omBucketInfo);
+    OMKeyDeleteResponse omKeyDeleteResponse = getOmKeyDeleteResponse(omKeyInfo,
+            omResponse);
 
-    String ozoneKey = omMetadataManager.getOzoneKey(volumeName, bucketName,
-        keyName);
-
-    TestOMRequestUtils.addKeyToTable(false, volumeName, bucketName, keyName,
-        clientID, replicationType, replicationFactor, omMetadataManager);
+    String ozoneKey = addKeyToTable();
 
     Assert.assertTrue(omMetadataManager.getKeyTable().isExist(ozoneKey));
 
@@ -181,4 +168,22 @@ public class TestOMKeyDeleteResponse extends TestOMKeyResponse {
     Assert.assertTrue(omMetadataManager.getKeyTable().isExist(ozoneKey));
 
   }
+
+  protected String addKeyToTable() throws Exception {
+    String ozoneKey = omMetadataManager.getOzoneKey(volumeName, bucketName,
+            keyName);
+
+    TestOMRequestUtils.addKeyToTable(false, volumeName, bucketName, keyName,
+            clientID, replicationType, replicationFactor, omMetadataManager);
+    return ozoneKey;
+  }
+
+  protected OMKeyDeleteResponse getOmKeyDeleteResponse(OmKeyInfo omKeyInfo,
+      OzoneManagerProtocolProtos.OMResponse omResponse) {
+    return new OMKeyDeleteResponse(omResponse, omKeyInfo, true, omBucketInfo);
+  }
+
+  protected OmBucketInfo getOmBucketInfo() {
+    return omBucketInfo;
+  }
 }
diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/key/TestOMKeyDeleteResponseV1.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/key/TestOMKeyDeleteResponseV1.java
new file mode 100644
index 0000000..3cfec38
--- /dev/null
+++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/key/TestOMKeyDeleteResponseV1.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.om.response.key;
+
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
+import org.apache.hadoop.ozone.om.request.TestOMRequestUtils;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
+import org.apache.hadoop.util.Time;
+import org.junit.Assert;
+
+/**
+ * Tests OMKeyDeleteResponse layout version V1.
+ */
+public class TestOMKeyDeleteResponseV1 extends TestOMKeyDeleteResponse {
+
+  @Override
+  protected OMKeyDeleteResponse getOmKeyDeleteResponse(OmKeyInfo omKeyInfo,
+      OzoneManagerProtocolProtos.OMResponse omResponse) {
+    return new OMKeyDeleteResponseV1(omResponse, omKeyInfo,
+            true, getOmBucketInfo(), false);
+  }
+
+  @Override
+  protected String addKeyToTable() throws Exception {
+    // Add volume, bucket and key entries to OM DB.
+    TestOMRequestUtils.addVolumeAndBucketToDB(volumeName, bucketName,
+            omMetadataManager);
+
+    // Create parent dirs for the path
+    long parentId = TestOMRequestUtils.addParentsToDirTable(volumeName,
+            bucketName, "", omMetadataManager);
+
+    OmKeyInfo omKeyInfo =
+            TestOMRequestUtils.createOmKeyInfo(volumeName, bucketName, keyName,
+                    HddsProtos.ReplicationType.RATIS,
+                    HddsProtos.ReplicationFactor.ONE,
+                    parentId + 1,
+                    parentId, 100, Time.now());
+    TestOMRequestUtils.addFileToKeyTable(false, false,
+            keyName, omKeyInfo, -1, 50, omMetadataManager);
+    return omKeyInfo.getPath();
+  }
+
+  @Override
+  protected OmKeyInfo getOmKeyInfo() {
+    Assert.assertNotNull(getOmBucketInfo());
+    return TestOMRequestUtils.createOmKeyInfo(volumeName,
+            getOmBucketInfo().getBucketName(), keyName, replicationType,
+            replicationFactor,
+            getOmBucketInfo().getObjectID() + 1,
+            getOmBucketInfo().getObjectID(), 100, Time.now());
+  }
+}
diff --git a/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicOzoneClientAdapterImpl.java b/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicOzoneClientAdapterImpl.java
index f95d9ee..0a8edff 100644
--- a/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicOzoneClientAdapterImpl.java
+++ b/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicOzoneClientAdapterImpl.java
@@ -30,6 +30,7 @@ import org.apache.hadoop.crypto.key.KeyProvider;
 import org.apache.hadoop.fs.BlockLocation;
 import org.apache.hadoop.fs.FileAlreadyExistsException;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathIsNotEmptyDirectoryException;
 import org.apache.hadoop.hdds.annotation.InterfaceAudience;
 import org.apache.hadoop.hdds.client.ReplicationFactor;
 import org.apache.hadoop.hdds.client.ReplicationType;
@@ -255,6 +256,7 @@ public class BasicOzoneClientAdapterImpl implements OzoneClientAdapter {
     return true;
   }
 
+
   /**
    * Helper method to delete an object specified by key name in bucket.
    *
@@ -262,12 +264,32 @@ public class BasicOzoneClientAdapterImpl implements OzoneClientAdapter {
    * @return true if the key is deleted, false otherwise
    */
   @Override
-  public boolean deleteObject(String keyName) {
+  public boolean deleteObject(String keyName) throws IOException {
+    return deleteObject(keyName, false);
+  }
+
+  /**
+   * Helper method to delete an object specified by key name in bucket.
+   *
+   * @param keyName key name to be deleted
+   * @param recursive recursive deletion of all sub path keys if true,
+   *                  otherwise non-recursive
+   * @return true if the key is deleted, false otherwise
+   */
+  @Override
+  public boolean deleteObject(String keyName, boolean recursive)
+      throws IOException {
     LOG.trace("issuing delete for key {}", keyName);
     try {
       incrementCounter(Statistic.OBJECTS_DELETED, 1);
-      bucket.deleteKey(keyName);
+      bucket.deleteDirectory(keyName, recursive);
       return true;
+    } catch (OMException ome) {
+      LOG.error("delete key failed {}", ome.getMessage());
+      if (OMException.ResultCodes.DIRECTORY_NOT_EMPTY == ome.getResult()) {
+        throw new PathIsNotEmptyDirectoryException(ome.getMessage());
+      }
+      return false;
     } catch (IOException ioe) {
       LOG.error("delete key failed {}", ioe.getMessage());
       return false;
diff --git a/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicOzoneFileSystem.java b/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicOzoneFileSystem.java
index 1da183e..5aff713 100644
--- a/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicOzoneFileSystem.java
+++ b/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicOzoneFileSystem.java
@@ -413,7 +413,17 @@ public class BasicOzoneFileSystem extends FileSystem {
   }
 
   private boolean renameV1(String srcPath, String dstPath) throws IOException {
-    adapter.renameKey(srcPath, dstPath);
+    try {
+      adapter.renameKey(srcPath, dstPath);
+    } catch (OMException ome) {
+      LOG.error("rename key failed: {}. source:{}, destin:{}",
+              ome.getMessage(), srcPath, dstPath);
+      if (OMException.ResultCodes.KEY_ALREADY_EXISTS == ome.getResult()) {
+        return false;
+      } else {
+        throw ome;
+      }
+    }
     return true;
   }
 
@@ -496,6 +506,20 @@ public class BasicOzoneFileSystem extends FileSystem {
     incrementCounter(Statistic.INVOCATION_DELETE, 1);
     statistics.incrementWriteOps(1);
     LOG.debug("Delete path {} - recursive {}", f, recursive);
+
+    String layOutVersion = adapter.getBucketLayoutVersion();
+    if (layOutVersion != null &&
+            OMConfigKeys.OZONE_OM_LAYOUT_VERSION_V1.equals(layOutVersion)) {
+
+      if (f.isRoot()) {
+        LOG.warn("Cannot delete root directory.");
+        return false;
+      }
+
+      String key = pathToKey(f);
+      return adapter.deleteObject(key, recursive);
+    }
+
     FileStatus status;
     try {
       status = getFileStatus(f);
diff --git a/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicRootedOzoneClientAdapterImpl.java b/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicRootedOzoneClientAdapterImpl.java
index 3e968ad..65f8411 100644
--- a/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicRootedOzoneClientAdapterImpl.java
+++ b/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicRootedOzoneClientAdapterImpl.java
@@ -38,6 +38,7 @@ import org.apache.hadoop.fs.FileAlreadyExistsException;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathIsNotEmptyDirectoryException;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hdds.annotation.InterfaceAudience;
 import org.apache.hadoop.hdds.client.ReplicationFactor;
@@ -439,10 +440,13 @@ public class BasicRootedOzoneClientAdapterImpl
    * Helper method to delete an object specified by key name in bucket.
    *
    * @param path path to a key to be deleted
+   * @param recursive recursive deletion of all sub path keys if true,
+   *                  otherwise non-recursive
    * @return true if the key is deleted, false otherwise
    */
   @Override
-  public boolean deleteObject(String path) {
+  public boolean deleteObject(String path, boolean recursive)
+      throws IOException {
     LOG.trace("issuing delete for path to key: {}", path);
     incrementCounter(Statistic.OBJECTS_DELETED, 1);
     OFSPath ofsPath = new OFSPath(path);
@@ -452,14 +456,25 @@ public class BasicRootedOzoneClientAdapterImpl
     }
     try {
       OzoneBucket bucket = getBucket(ofsPath, false);
-      bucket.deleteKey(keyName);
+      bucket.deleteDirectory(keyName, recursive);
       return true;
+    } catch (OMException ome) {
+      LOG.error("delete key failed {}", ome.getMessage());
+      if (OMException.ResultCodes.DIRECTORY_NOT_EMPTY == ome.getResult()) {
+        throw new PathIsNotEmptyDirectoryException(ome.getMessage());
+      }
+      return false;
     } catch (IOException ioe) {
       LOG.error("delete key failed " + ioe.getMessage());
       return false;
     }
   }
 
+  @Override
+  public boolean deleteObject(String path) throws IOException {
+    return deleteObject(path, false);
+  }
+
   /**
    * Helper function to check if the list of key paths are in the same volume
    * and same bucket.
diff --git a/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/OzoneClientAdapter.java b/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/OzoneClientAdapter.java
index 5b65a0e..4a4d91b 100644
--- a/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/OzoneClientAdapter.java
+++ b/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/OzoneClientAdapter.java
@@ -51,7 +51,9 @@ public interface OzoneClientAdapter {
 
   boolean createDirectory(String keyName) throws IOException;
 
-  boolean deleteObject(String keyName);
+  boolean deleteObject(String keyName) throws IOException;
+
+  boolean deleteObject(String keyName, boolean recursive) throws IOException;
 
   boolean deleteObjects(List<String> keyName);
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org