You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by ra...@apache.org on 2020/11/19 15:54:05 UTC

[ozone] branch HDDS-2939 updated: HDDS-4357: Rename : make rename an atomic ops by updating key path entry in dir/file table (#1557)

This is an automated email from the ASF dual-hosted git repository.

rakeshr pushed a commit to branch HDDS-2939
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/HDDS-2939 by this push:
     new ff04733  HDDS-4357: Rename : make rename an atomic ops by updating key path entry in dir/file table (#1557)
ff04733 is described below

commit ff04733f18879572db9c28a8a68e4122ea42708b
Author: Rakesh Radhakrishnan <ra...@apache.org>
AuthorDate: Thu Nov 19 21:23:45 2020 +0530

    HDDS-4357: Rename : make rename an atomic ops by updating key path entry in dir/file table (#1557)
    
    HDDS-4357: Rename : make rename an atomic ops by updating key path entry in dir/file table
---
 .../org/apache/hadoop/ozone/om/OMConfigKeys.java   |   2 +
 .../apache/hadoop/ozone/om/helpers/OmKeyInfo.java  |   4 +
 .../hadoop/ozone/om/helpers/OzoneFSUtils.java      |  43 +++
 .../hadoop/fs/ozone/TestOzoneFileSystem.java       | 298 ++++++++++++++++++++-
 .../hadoop/fs/ozone/TestOzoneFileSystemV1.java     | 193 ++++++++++++-
 .../org/apache/hadoop/ozone/om/KeyManagerImpl.java |  27 +-
 .../org/apache/hadoop/ozone/om/OzoneManager.java   |   2 +
 .../om/ratis/utils/OzoneManagerRatisUtils.java     |   4 +
 .../om/request/bucket/OMBucketCreateRequest.java   |  24 ++
 .../ozone/om/request/file/OMFileRequest.java       |  94 +++++++
 .../ozone/om/request/key/OMKeyRenameRequestV1.java | 292 ++++++++++++++++++++
 .../ozone/om/response/key/OMKeyRenameResponse.java |  11 +
 ...ameResponse.java => OMKeyRenameResponseV1.java} |  68 +++--
 .../fs/ozone/BasicOzoneClientAdapterImpl.java      |   5 +
 .../hadoop/fs/ozone/BasicOzoneFileSystem.java      |  13 +
 .../ozone/BasicRootedOzoneClientAdapterImpl.java   |   7 +
 .../apache/hadoop/fs/ozone/OzoneClientAdapter.java |   1 +
 17 files changed, 1038 insertions(+), 50 deletions(-)

diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
index db3c453..9685997 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
@@ -253,4 +253,6 @@ public final class OMConfigKeys {
   public static final String OZONE_OM_LAYOUT_VERSION =
           "ozone.om.layout.version";
   public static final String OZONE_OM_LAYOUT_VERSION_DEFAULT = "V0";
+
+  public static final String OZONE_OM_LAYOUT_VERSION_V1 = "V1";
 }
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyInfo.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyInfo.java
index 494e1b9..596f43f 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyInfo.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyInfo.java
@@ -281,6 +281,10 @@ public final class OmKeyInfo extends WithObjectID {
     return OzoneAclUtil.setAcl(acls, newAcls);
   }
 
+  public void setParentObjectID(long parentObjectID) {
+    this.parentObjectID = parentObjectID;
+  }
+
   /**
    * Builder of OmKeyInfo.
    */
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OzoneFSUtils.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OzoneFSUtils.java
index 63bfd8f..e9d4cf9 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OzoneFSUtils.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OzoneFSUtils.java
@@ -162,4 +162,47 @@ public final class OzoneFSUtils {
 
     return parentPath.equals(childParent);
   }
+
+  /**
+   * The function returns parent directory from the given absolute path. For
+   * example, the given key path '/a/b/c/d/e/file1' then it returns parent
+   * directory name as 'e'.
+   *
+   * @param keyName key name
+   * @return parent directory. If not found then return keyName itself.
+   */
+  public static String getParentDir(@Nonnull String keyName) {
+    java.nio.file.Path fileName = Paths.get(keyName).getParent();
+    if (fileName != null) {
+      return fileName.toString();
+    }
+    // failed to find a parent directory.
+    return keyName;
+  }
+
+  /**
+   * This function appends the given file name to the given key name path.
+   *
+   * @param keyName key name
+   * @param fileName  file name
+   * @return full path
+   */
+  public static String appendFileNameToKeyPath(String keyName,
+                                               String fileName) {
+    StringBuilder newToKeyName = new StringBuilder(keyName);
+    newToKeyName.append(OZONE_URI_DELIMITER);
+    newToKeyName.append(fileName);
+    return newToKeyName.toString();
+  }
+
+  /**
+   * Returns the number of path components in the given keyName.
+   *
+   * @param keyName keyname
+   * @return path components count
+   */
+  public static int getFileCount(String keyName) {
+    java.nio.file.Path keyPath = Paths.get(keyName);
+    return keyPath.getNameCount();
+  }
 }
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneFileSystem.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneFileSystem.java
index 0e422fe..13c4c44 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneFileSystem.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneFileSystem.java
@@ -267,6 +267,16 @@ public class TestOzoneFileSystem {
     testNonExplicitlyCreatedPathExistsAfterItsLeafsWereRemoved();
 
     testRenameDir();
+    testRenameFile();
+    testRenameWithNonExistentSource();
+    testRenameDirToItsOwnSubDir();
+    testRenameSourceAndDestinAreSame();
+    testRenameToExistingDir();
+    testRenameToNewSubDirShouldNotExist();
+    testRenameDirToFile();
+    testRenameFileToDir();
+    testRenameDestinationParentDoesntExist();
+    testRenameToParentDir();
     testSeekOnFileLength();
     testDeleteRoot();
 
@@ -621,6 +631,16 @@ public class TestOzoneFileSystem {
       stream.seek(fileLength);
       assertEquals(-1, stream.read());
     }
+
+    // non-existent file
+    Path fileNotExists = new Path("/file_notexist");
+    try {
+      fs.open(fileNotExists);
+      Assert.fail("Should throw FILE_NOT_FOUND error as file doesn't exist!");
+    } catch (FileNotFoundException fnfe) {
+      Assert.assertTrue("Expected FILE_NOT_FOUND error",
+              fnfe.getMessage().contains("FILE_NOT_FOUND"));
+    }
   }
 
   public void testDeleteRoot() throws IOException {
@@ -653,8 +673,277 @@ public class TestOzoneFileSystem {
         interimPath.getName(), fileStatus.getPath().getName());
   }
 
-  private void testRenameDir() throws Exception {
+  /**
+   * Case-1) fromKeyName should exist, otw throws exception.
+   */
+  protected void testRenameWithNonExistentSource() throws Exception {
+    final String root = "/root";
+    final String dir1 = root + "/dir1";
+    final String dir2 = root + "/dir2";
+    final Path source = new Path(fs.getUri().toString() + dir1);
+    final Path destin = new Path(fs.getUri().toString() + dir2);
+
+    // creates destin
+    fs.mkdirs(destin);
+    LOG.info("Created destin dir: {}", destin);
+
+    LOG.info("Rename op-> source:{} to destin:{}}", source, destin);
+    assertFalse("Expected to fail rename as src doesn't exist",
+            fs.rename(source, destin));
+  }
+
+  /**
+   * Case-2) Cannot rename a directory to its own subdirectory.
+   */
+  protected void testRenameDirToItsOwnSubDir() throws Exception {
+    final String root = "/root";
+    final String dir1 = root + "/dir1";
+    final Path dir1Path = new Path(fs.getUri().toString() + dir1);
+    // Add a sub-dir1 to the directory to be moved.
+    final Path subDir1 = new Path(dir1Path, "sub_dir1");
+    fs.mkdirs(subDir1);
+    LOG.info("Created dir1 {}", subDir1);
+
+    final Path sourceRoot = new Path(fs.getUri().toString() + root);
+    LOG.info("Rename op-> source:{} to destin:{}", sourceRoot, subDir1);
+    try {
+      fs.rename(sourceRoot, subDir1);
+      Assert.fail("Should throw exception : Cannot rename a directory to" +
+              " its own subdirectory");
+    } catch (IllegalArgumentException iae) {
+      // expected
+    }
+  }
+
+  /**
+   * Case-3) If src == destin then check source and destin of same type.
+   */
+  protected void testRenameSourceAndDestinAreSame() throws Exception {
+    final String root = "/root";
+    final String dir1 = root + "/dir1";
+    final String dir2 = dir1 + "/dir2";
+    final Path dir2Path = new Path(fs.getUri().toString() + dir2);
+    fs.mkdirs(dir2Path);
+
+    // File rename
+    Path file1 = new Path(fs.getUri().toString() + dir2 + "/file1");
+    ContractTestUtils.touch(fs, file1);
+
+    assertTrue(fs.rename(file1, file1));
+    assertTrue(fs.rename(dir2Path, dir2Path));
+  }
+
+  /**
+   * Case-4) Rename from /a, to /b.
+   * <p>
+   * Expected Result: After rename the directory structure will be /b/a.
+   */
+  protected void testRenameToExistingDir() throws Exception {
+    // created /a
+    final Path aSourcePath = new Path(fs.getUri().toString() + "/a");
+    fs.mkdirs(aSourcePath);
+
+    // created /b
+    final Path bDestinPath = new Path(fs.getUri().toString() + "/b");
+    fs.mkdirs(bDestinPath);
+
+    // Add a sub-directory '/a/c' to '/a'. This is to verify that after
+    // rename sub-directory also be moved.
+    final Path acPath = new Path(fs.getUri().toString() + "/a/c");
+    fs.mkdirs(acPath);
+
+    // Rename from /a to /b.
+    assertTrue("Rename failed", fs.rename(aSourcePath, bDestinPath));
+
+    final Path baPath = new Path(fs.getUri().toString() + "/b/a");
+    final Path bacPath = new Path(fs.getUri().toString() + "/b/a/c");
+    assertTrue("Rename failed", fs.exists(baPath));
+    assertTrue("Rename failed", fs.exists(bacPath));
+  }
+
+  /**
+   * Case-5) If new destin '/dst/source' exists then throws exception.
+   * If destination is a directory then rename source as sub-path of it.
+   * <p>
+   * For example: rename /a to /b will lead to /b/a. This new path should
+   * not exist.
+   */
+  protected void testRenameToNewSubDirShouldNotExist() throws Exception {
+    // Case-5.a) Rename directory from /a to /b.
+    // created /a
+    final Path aSourcePath = new Path(fs.getUri().toString() + "/a");
+    fs.mkdirs(aSourcePath);
+
+    // created /b
+    final Path bDestinPath = new Path(fs.getUri().toString() + "/b");
+    fs.mkdirs(bDestinPath);
+
+    // Add a sub-directory '/b/a' to '/b'. This is to verify that rename
+    // throws exception as new destin /b/a already exists.
+    final Path baPath = new Path(fs.getUri().toString() + "/b/a");
+    fs.mkdirs(baPath);
+
+    try {
+      fs.rename(aSourcePath, bDestinPath);
+      Assert.fail("Should fail as new destination dir exists!");
+    } catch (FileAlreadyExistsException faee) {
+      // expected as new sub-path /b/a already exists.
+    }
+
+    // Case-5.b) Rename file from /a/b/c/file1 to /a.
+    // Should be failed since /a/file1 exists.
+    final Path abcPath = new Path(fs.getUri().toString() + "/a/b/c");
+    fs.mkdirs(abcPath);
+    Path abcFile1 = new Path(abcPath, "/file1");
+    ContractTestUtils.touch(fs, abcFile1);
+
+    final Path aFile1 = new Path(fs.getUri().toString() + "/a/file1");
+    ContractTestUtils.touch(fs, aFile1);
+
+    final Path aDestinPath = new Path(fs.getUri().toString() + "/a");
+
+    try {
+      fs.rename(abcFile1, aDestinPath);
+      Assert.fail("Should fail as new destination file exists!");
+    } catch (FileAlreadyExistsException faee) {
+      // expected as new sub-path /a/file1 already exists.
+    }
+  }
+
+  /**
+   * Case-6) Rename directory to an existed file, should be failed.
+   */
+  protected void testRenameDirToFile() throws Exception {
+    final String root = "/root";
+    Path rootPath = new Path(fs.getUri().toString() + root);
+    fs.mkdirs(rootPath);
+
+    Path file1Destin = new Path(fs.getUri().toString() + root + "/file1");
+    ContractTestUtils.touch(fs, file1Destin);
+    Path abcRootPath = new Path(fs.getUri().toString() + "/a/b/c");
+    fs.mkdirs(abcRootPath);
+    try {
+      fs.rename(abcRootPath, file1Destin);
+      Assert.fail("key already exists /root_dir/file1");
+    } catch (FileAlreadyExistsException faee) {
+      // expected
+    }
+  }
+
+  /**
+   * Rename file to a non-existent destin file.
+   */
+  protected void testRenameFile() throws Exception {
+    final String root = "/root";
+    Path rootPath = new Path(fs.getUri().toString() + root);
+    fs.mkdirs(rootPath);
+
+    Path file1Source = new Path(fs.getUri().toString() + root
+            + "/file1_Copy");
+    ContractTestUtils.touch(fs, file1Source);
+    Path file1Destin = new Path(fs.getUri().toString() + root + "/file1");
+    assertTrue("Renamed failed", fs.rename(file1Source, file1Destin));
+    assertTrue("Renamed failed: /root/file1", fs.exists(file1Destin));
+
+    /**
+     * Reading several times, this is to verify that OmKeyInfo#keyName cached
+     * entry is not modified. While reading back, OmKeyInfo#keyName will be
+     * prepared and assigned to fullkeyPath name.
+     */
+    for (int i = 0; i < 10; i++) {
+      FileStatus[] fStatus = fs.listStatus(rootPath);
+      assertEquals("Renamed failed", 1, fStatus.length);
+      assertEquals("Wrong path name!", file1Destin, fStatus[0].getPath());
+    }
+  }
+
+  /**
+   * Rename file to an existed directory.
+   */
+  protected void testRenameFileToDir() throws Exception {
+    final String root = "/root";
+    Path rootPath = new Path(fs.getUri().toString() + root);
+    fs.mkdirs(rootPath);
+
+    Path file1Destin = new Path(fs.getUri().toString() + root + "/file1");
+    ContractTestUtils.touch(fs, file1Destin);
+    Path abcRootPath = new Path(fs.getUri().toString() + "/a/b/c");
+    fs.mkdirs(abcRootPath);
+    assertTrue("Renamed failed", fs.rename(file1Destin, abcRootPath));
+    assertTrue("Renamed filed: /a/b/c/file1", fs.exists(new Path(abcRootPath,
+            "file1")));
+  }
+
+
+  /**
+   * Fails if the (a) parent of dst does not exist or (b) parent is a file.
+   */
+  protected void testRenameDestinationParentDoesntExist() throws Exception {
+    final String root = "/root_dir";
+    final String dir1 = root + "/dir1";
+    final String dir2 = dir1 + "/dir2";
+    final Path dir2SourcePath = new Path(fs.getUri().toString() + dir2);
+    fs.mkdirs(dir2SourcePath);
+
+    // (a) parent of dst does not exist.  /root_dir/b/c
+    final Path destinPath = new Path(fs.getUri().toString() + root + "/b/c");
+    try {
+      fs.rename(dir2SourcePath, destinPath);
+      Assert.fail("Should fail as parent of dst does not exist!");
+    } catch (FileNotFoundException fnfe) {
+      // expected
+    }
+
+    // (b) parent of dst is a file. /root_dir/file1/c
+    Path filePath = new Path(fs.getUri().toString() + root + "/file1");
+    ContractTestUtils.touch(fs, filePath);
+
+    Path newDestinPath = new Path(filePath, "c");
+    try {
+      fs.rename(dir2SourcePath, newDestinPath);
+      Assert.fail("Should fail as parent of dst is a file!");
+    } catch (IOException ioe) {
+      // expected
+    }
+  }
+
+  /**
+   * Rename to the source's parent directory, it will succeed.
+   * 1. Rename from /root_dir/dir1/dir2 to /root_dir.
+   * Expected result : /root_dir/dir2
+   * <p>
+   * 2. Rename from /root_dir/dir1/file1 to /root_dir.
+   * Expected result : /root_dir/file1.
+   */
+  protected void testRenameToParentDir() throws Exception {
+    final String root = "/root_dir";
+    final String dir1 = root + "/dir1";
+    final String dir2 = dir1 + "/dir2";
+    final Path dir2SourcePath = new Path(fs.getUri().toString() + dir2);
+    fs.mkdirs(dir2SourcePath);
+    final Path destRootPath = new Path(fs.getUri().toString() + root);
+
+    Path file1Source = new Path(fs.getUri().toString() + dir1 + "/file2");
+    ContractTestUtils.touch(fs, file1Source);
+
+    // rename source directory to its parent directory(destination).
+    assertTrue("Rename failed", fs.rename(dir2SourcePath, destRootPath));
+    final Path expectedPathAfterRename =
+            new Path(fs.getUri().toString() + root + "/dir2");
+    assertTrue("Rename failed",
+            fs.exists(expectedPathAfterRename));
+
+    // rename source file to its parent directory(destination).
+    assertTrue("Rename failed", fs.rename(file1Source, destRootPath));
+    final Path expectedFilePathAfterRename =
+            new Path(fs.getUri().toString() + root + "/file2");
+    assertTrue("Rename failed",
+            fs.exists(expectedFilePathAfterRename));
+  }
+
+  protected void testRenameDir() throws Exception {
     final String dir = "/root_dir/dir1";
+    Path rootDir = new Path(fs.getUri().toString() +  "/root_dir");
     final Path source = new Path(fs.getUri().toString() + dir);
     final Path dest = new Path(source.toString() + ".renamed");
     // Add a sub-dir to the directory to be moved.
@@ -676,6 +965,13 @@ public class TestOzoneFileSystem {
     // Renaming to same path when src is specified with scheme.
     assertTrue("Renaming to same path should be success.",
         fs.rename(source, new Path(dir)));
+
+    // rename root directory
+    Path rootDestinDir = new Path(fs.getUri().toString() +  "/root_dir" +
+            ".renamed");
+    fs.rename(rootDir, rootDestinDir);
+    assertTrue("Directory rename failed", fs.exists(rootDestinDir));
+    assertFalse("Directory rename failed", fs.exists(rootDir));
   }
   private OzoneKeyDetails getKey(Path keyPath, boolean isDirectory)
       throws IOException {
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneFileSystemV1.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneFileSystemV1.java
index 415aec8..21e2157 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneFileSystemV1.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneFileSystemV1.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.LocatedFileStatus;
 import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.contract.ContractTestUtils;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.utils.db.Table;
 import org.apache.hadoop.hdds.utils.db.TableIterator;
@@ -31,6 +32,7 @@ import org.apache.hadoop.hdds.utils.db.cache.CacheKey;
 import org.apache.hadoop.hdds.utils.db.cache.CacheValue;
 import org.apache.hadoop.ozone.om.OMConfigKeys;
 import org.apache.hadoop.ozone.om.OMMetadataManager;
+import org.apache.hadoop.ozone.om.exceptions.OMException;
 import org.apache.hadoop.ozone.om.helpers.OmDirectoryInfo;
 import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
 import org.jetbrains.annotations.NotNull;
@@ -50,9 +52,7 @@ import java.util.Iterator;
 import java.util.Map;
 
 import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_TRASH_INTERVAL_KEY;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.fail;
+import static org.junit.Assert.*;
 
 /**
  * Ozone file system tests that are not covered by contract tests,
@@ -260,6 +260,160 @@ public class TestOzoneFileSystemV1 extends TestOzoneFileSystem {
             expectedFilesCount, actualCount);
   }
 
+  /**
+   * Case-1) fromKeyName should exist, otw throws exception.
+   */
+  protected void testRenameWithNonExistentSource() throws Exception {
+    final String root = "/root";
+    final String dir1 = root + "/dir1";
+    final String dir2 = root + "/dir2";
+    final Path source = new Path(fs.getUri().toString() + dir1);
+    final Path destin = new Path(fs.getUri().toString() + dir2);
+
+    // creates destin
+    fs.mkdirs(destin);
+    LOG.info("Created destin dir: {}", destin);
+
+    LOG.info("Rename op-> source:{} to destin:{}}", source, destin);
+    try {
+      fs.rename(source, destin);
+      Assert.fail("Should throw exception : Source doesn't exist!");
+    } catch (OMException ome) {
+      // expected
+      assertEquals(ome.getResult(), OMException.ResultCodes.KEY_NOT_FOUND);
+    }
+  }
+
+  /**
+   * Case-2) Cannot rename a directory to its own subdirectory.
+   */
+  protected void testRenameDirToItsOwnSubDir() throws Exception {
+    final String root = "/root";
+    final String dir1 = root + "/dir1";
+    final Path dir1Path = new Path(fs.getUri().toString() + dir1);
+    // Add a sub-dir1 to the directory to be moved.
+    final Path subDir1 = new Path(dir1Path, "sub_dir1");
+    fs.mkdirs(subDir1);
+    LOG.info("Created dir1 {}", subDir1);
+
+    final Path sourceRoot = new Path(fs.getUri().toString() + root);
+    LOG.info("Rename op-> source:{} to destin:{}", sourceRoot, subDir1);
+    try {
+      fs.rename(sourceRoot, subDir1);
+      Assert.fail("Should throw exception : Cannot rename a directory to" +
+              " its own subdirectory");
+    } catch (OMException ome) {
+      // expected
+      assertEquals(ome.getResult(), OMException.ResultCodes.KEY_RENAME_ERROR);
+    }
+  }
+
+  /**
+   * Case-5) If new destin '/dst/source' exists then throws exception.
+   * If destination is a directory then rename source as sub-path of it.
+   * <p>
+   * For example: rename /a to /b will lead to /b/a. This new path should
+   * not exist.
+   */
+  protected void testRenameToNewSubDirShouldNotExist() throws Exception {
+    // Case-5.a) Rename directory from /a to /b.
+    // created /a
+    final Path aSourcePath = new Path(fs.getUri().toString() + "/a");
+    fs.mkdirs(aSourcePath);
+
+    // created /b
+    final Path bDestinPath = new Path(fs.getUri().toString() + "/b");
+    fs.mkdirs(bDestinPath);
+
+    // Add a sub-directory '/b/a' to '/b'. This is to verify that rename
+    // throws exception as new destin /b/a already exists.
+    final Path baPath = new Path(fs.getUri().toString() + "/b/a");
+    fs.mkdirs(baPath);
+
+    try {
+      fs.rename(aSourcePath, bDestinPath);
+      Assert.fail("Should fail as new destination dir exists!");
+    } catch (OMException ome) {
+      // expected as new sub-path /b/a already exists.
+      assertEquals(ome.getResult(), OMException.ResultCodes.KEY_ALREADY_EXISTS);
+    }
+
+    // Case-5.b) Rename file from /a/b/c/file1 to /a.
+    // Should be failed since /a/file1 exists.
+    final Path abcPath = new Path(fs.getUri().toString() + "/a/b/c");
+    fs.mkdirs(abcPath);
+    Path abcFile1 = new Path(abcPath, "/file1");
+    ContractTestUtils.touch(fs, abcFile1);
+
+    final Path aFile1 = new Path(fs.getUri().toString() + "/a/file1");
+    ContractTestUtils.touch(fs, aFile1);
+
+    final Path aDestinPath = new Path(fs.getUri().toString() + "/a");
+
+    try {
+      fs.rename(abcFile1, aDestinPath);
+      Assert.fail("Should fail as new destination file exists!");
+    } catch (OMException ome) {
+      // expected as new sub-path /b/a already exists.
+      assertEquals(ome.getResult(), OMException.ResultCodes.KEY_ALREADY_EXISTS);
+    }
+  }
+
+  /**
+   * Case-6) Rename directory to an existed file, should be failed.
+   */
+  protected void testRenameDirToFile() throws Exception {
+    final String root = "/root";
+    Path rootPath = new Path(fs.getUri().toString() + root);
+    fs.mkdirs(rootPath);
+
+    Path file1Destin = new Path(fs.getUri().toString() + root + "/file1");
+    ContractTestUtils.touch(fs, file1Destin);
+    Path abcRootPath = new Path(fs.getUri().toString() + "/a/b/c");
+    fs.mkdirs(abcRootPath);
+    try {
+      fs.rename(abcRootPath, file1Destin);
+      Assert.fail("key already exists /root_dir/file1");
+    } catch (OMException ome) {
+      // expected
+      assertEquals(ome.getResult(), OMException.ResultCodes.KEY_ALREADY_EXISTS);
+    }
+  }
+
+  /**
+   * Fails if the (a) parent of dst does not exist or (b) parent is a file.
+   */
+  protected void testRenameDestinationParentDoesntExist() throws Exception {
+    final String root = "/root_dir";
+    final String dir1 = root + "/dir1";
+    final String dir2 = dir1 + "/dir2";
+    final Path dir2SourcePath = new Path(fs.getUri().toString() + dir2);
+    fs.mkdirs(dir2SourcePath);
+
+    // (a) parent of dst does not exist.  /root_dir/b/c
+    final Path destinPath = new Path(fs.getUri().toString() + root + "/b/c");
+    try {
+      fs.rename(dir2SourcePath, destinPath);
+      Assert.fail("Should fail as parent of dst does not exist!");
+    } catch (OMException ome) {
+      // expected
+      assertEquals(ome.getResult(), OMException.ResultCodes.KEY_RENAME_ERROR);
+    }
+
+    // (b) parent of dst is a file. /root_dir/file1/c
+    Path filePath = new Path(fs.getUri().toString() + root + "/file1");
+    ContractTestUtils.touch(fs, filePath);
+
+    Path newDestinPath = new Path(filePath, "c");
+    try {
+      fs.rename(dir2SourcePath, newDestinPath);
+      Assert.fail("Should fail as parent of dst is a file!");
+    } catch (OMException ome) {
+      // expected
+      assertEquals(ome.getResult(), OMException.ResultCodes.KEY_RENAME_ERROR);
+    }
+  }
+
   @Test(timeout = 300_000)
   @Override
   public void testFileSystem() throws Exception {
@@ -292,6 +446,35 @@ public class TestOzoneFileSystemV1 extends TestOzoneFileSystem {
     tableCleanup();
     testListStatusOnLargeDirectory();
     tableCleanup();
+
+    testNonExplicitlyCreatedPathExistsAfterItsLeafsWereRemoved();
+    tableCleanup();
+
+    testRenameDir();
+    tableCleanup();
+    testRenameFile();
+    tableCleanup();
+    testRenameWithNonExistentSource();
+    tableCleanup();
+    testRenameDirToItsOwnSubDir();
+    tableCleanup();
+    testRenameSourceAndDestinAreSame();
+    tableCleanup();
+    testRenameToExistingDir();
+    tableCleanup();
+    testRenameToNewSubDirShouldNotExist();
+    tableCleanup();
+    testRenameDirToFile();
+    tableCleanup();
+    testRenameFileToDir();
+    tableCleanup();
+    testRenameDestinationParentDoesntExist();
+    tableCleanup();
+    testRenameToParentDir();
+    tableCleanup();
+
+    testSeekOnFileLength();
+    tableCleanup();
   }
 
   /**
@@ -348,8 +531,8 @@ public class TestOzoneFileSystemV1 extends TestOzoneFileSystem {
       keyTableIterator.next();
     }
 
-    Iterator<Map.Entry<CacheKey<String>, CacheValue<OmDirectoryInfo>>>
-            keyCacheIterator = metadataMgr.getDirectoryTable().cacheIterator();
+    Iterator<Map.Entry<CacheKey<String>, CacheValue<OmKeyInfo>>>
+            keyCacheIterator = metadataMgr.getKeyTable().cacheIterator();
     while(keyCacheIterator.hasNext()){
       keyCacheIterator.next();
       keyCacheIterator.remove();
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
index 27ccf21..8c2426b 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
@@ -2053,10 +2053,16 @@ public class KeyManagerImpl implements KeyManager {
     String volumeName = args.getVolumeName();
     String bucketName = args.getBucketName();
     String keyName = args.getKeyName();
-    OzoneFileStatus fileStatus = getOzoneFileStatus(volumeName, bucketName,
-            keyName, args.getRefreshPipeline(), args.getSortDatanodes(),
-            clientAddress);
-      //if key is not of type file or if key is not found we throw an exception
+    OzoneFileStatus fileStatus;
+    if (OzoneManagerRatisUtils.isOmLayoutVersionV1()) {
+      fileStatus = getOzoneFileStatusV1(volumeName, bucketName, keyName,
+              args.getSortDatanodes(), clientAddress, false);
+    } else {
+      fileStatus = getOzoneFileStatus(volumeName, bucketName,
+              keyName, args.getRefreshPipeline(), args.getSortDatanodes(),
+              clientAddress);
+    }
+    //if key is not of type file or if key is not found we throw an exception
     if (fileStatus.isFile()) {
       return fileStatus.getKeyInfo();
     }
@@ -2536,13 +2542,18 @@ public class KeyManagerImpl implements KeyManager {
         continue;
       }
 
-      cacheOmKeyInfo.setFileName(cacheOmKeyInfo.getKeyName());
+      // make OmKeyInfo local copy to reset keyname to "fullKeyPath".
+      // In DB keyName stores only the leaf node but the list
+      // returning to the user should have full path.
+      OmKeyInfo omKeyInfo = cacheOmKeyInfo.copyObject();
+
+      omKeyInfo.setFileName(omKeyInfo.getKeyName());
       String fullKeyPath = OMFileRequest.getAbsolutePath(prefixKeyPath,
-              cacheOmKeyInfo.getKeyName());
-      cacheOmKeyInfo.setKeyName(fullKeyPath);
+              omKeyInfo.getKeyName());
+      omKeyInfo.setKeyName(fullKeyPath);
 
       countEntries = addKeyInfoToFileStatusList(fileStatusList, prefixKeyInDB,
-              seekKeyInDB, startKey, countEntries, cacheKey, cacheOmKeyInfo,
+              seekKeyInDB, startKey, countEntries, cacheKey, omKeyInfo,
               false);
     }
     return countEntries;
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
index c485235..02ba17f 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
@@ -3609,6 +3609,8 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
     boolean omLayoutVersionV1 =
             StringUtils.equalsIgnoreCase(version, "V1");
     OzoneManagerRatisUtils.setOmLayoutVersionV1(omLayoutVersionV1);
+    LOG.info("Configured {}={} and enabled:{} optimized OM FS operations",
+            OZONE_OM_LAYOUT_VERSION, version, omLayoutVersionV1);
   }
 
   /**
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/utils/OzoneManagerRatisUtils.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/utils/OzoneManagerRatisUtils.java
index d4c0f17..d2dd5c7 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/utils/OzoneManagerRatisUtils.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/utils/OzoneManagerRatisUtils.java
@@ -44,6 +44,7 @@ import org.apache.hadoop.ozone.om.request.key.OMKeyCreateRequest;
 import org.apache.hadoop.ozone.om.request.key.OMKeyDeleteRequest;
 import org.apache.hadoop.ozone.om.request.key.OMKeyPurgeRequest;
 import org.apache.hadoop.ozone.om.request.key.OMKeyRenameRequest;
+import org.apache.hadoop.ozone.om.request.key.OMKeyRenameRequestV1;
 import org.apache.hadoop.ozone.om.request.key.OMKeysRenameRequest;
 import org.apache.hadoop.ozone.om.request.key.OMTrashRecoverRequest;
 import org.apache.hadoop.ozone.om.request.key.acl.OMKeyAddAclRequest;
@@ -149,6 +150,9 @@ public final class OzoneManagerRatisUtils {
     case DeleteKeys:
       return new OMKeysDeleteRequest(omRequest);
     case RenameKey:
+      if (omLayoutVersionV1) {
+        return new OMKeyRenameRequestV1(omRequest);
+      }
       return new OMKeyRenameRequest(omRequest);
     case RenameKeys:
       return new OMKeysRenameRequest(omRequest);
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/bucket/OMBucketCreateRequest.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/bucket/OMBucketCreateRequest.java
index fd303e7..102ac1e 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/bucket/OMBucketCreateRequest.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/bucket/OMBucketCreateRequest.java
@@ -20,14 +20,18 @@ package org.apache.hadoop.ozone.om.request.bucket;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.stream.Collectors;
 
 import com.google.common.base.Optional;
 
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.ozone.OmUtils;
 import org.apache.hadoop.ozone.OzoneAcl;
 import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.om.OMConfigKeys;
 import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
 import org.apache.hadoop.ozone.om.helpers.OzoneAclUtil;
 import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerDoubleBufferHelper;
@@ -156,6 +160,9 @@ public class OMBucketCreateRequest extends OMClientRequest {
         getOmRequest());
     OmBucketInfo omBucketInfo = OmBucketInfo.getFromProtobuf(bucketInfo);
 
+    // Add layout version V1 to bucket info
+    addLayoutVersionToBucket(ozoneManager, omBucketInfo);
+
     AuditLogger auditLogger = ozoneManager.getAuditLogger();
     OzoneManagerProtocolProtos.UserInfo userInfo = getOmRequest().getUserInfo();
 
@@ -335,4 +342,21 @@ public class OMBucketCreateRequest extends OMClientRequest {
 
   }
 
+  private void addLayoutVersionToBucket(OzoneManager ozoneManager,
+                                        OmBucketInfo omBucketInfo) {
+    Map<String, String> metadata = omBucketInfo.getMetadata();
+    if (metadata == null) {
+      metadata = new HashMap<>();
+    }
+    OzoneConfiguration configuration = ozoneManager.getConfiguration();
+    // TODO: Many unit test cases has null config and done a simple null
+    //  check now. It can be done later, to avoid massive test code changes.
+    if (configuration != null) {
+      String layOutVersion = configuration
+              .get(OMConfigKeys.OZONE_OM_LAYOUT_VERSION,
+                      OMConfigKeys.OZONE_OM_LAYOUT_VERSION_DEFAULT);
+      metadata.put(OMConfigKeys.OZONE_OM_LAYOUT_VERSION, layOutVersion);
+      omBucketInfo.setMetadata(metadata);
+    }
+  }
 }
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/file/OMFileRequest.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/file/OMFileRequest.java
index 91be9a2..9ef924e 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/file/OMFileRequest.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/file/OMFileRequest.java
@@ -37,6 +37,7 @@ import org.apache.hadoop.hdds.utils.db.cache.CacheValue;
 import org.apache.hadoop.ozone.OzoneAcl;
 import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.ozone.om.OMMetadataManager;
+import org.apache.hadoop.ozone.om.exceptions.OMException;
 import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
 import org.apache.hadoop.ozone.om.helpers.OmDirectoryInfo;
 import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
@@ -696,4 +697,97 @@ public final class OMFileRequest {
     }
     return prefixName.concat(OzoneConsts.OZONE_URI_DELIMITER).concat(fileName);
   }
+
+  /**
+   * Build DirectoryInfo from OmKeyInfo.
+   *
+   * @param keyInfo omKeyInfo
+   * @return omDirectoryInfo object
+   */
+  public static OmDirectoryInfo getDirectoryInfo(OmKeyInfo keyInfo){
+    OmDirectoryInfo.Builder builder = new OmDirectoryInfo.Builder();
+    builder.setParentObjectID(keyInfo.getParentObjectID());
+    builder.setAcls(keyInfo.getAcls());
+    builder.addAllMetadata(keyInfo.getMetadata());
+    builder.setCreationTime(keyInfo.getCreationTime());
+    builder.setModificationTime(keyInfo.getModificationTime());
+    builder.setObjectID(keyInfo.getObjectID());
+    builder.setUpdateID(keyInfo.getUpdateID());
+    builder.setName(OzoneFSUtils.getFileName(keyInfo.getKeyName()));
+    return builder.build();
+  }
+
+  /**
+   * Verify that the given toKey directory is a sub directory of fromKey
+   * directory.
+   * <p>
+   * For example, special case of renaming a directory to its own
+   * sub-directory is not allowed.
+   *
+   * @param fromKeyName source path
+   * @param toKeyName   destination path
+   * @param isDir       true represents a directory type otw a file type
+   * @throws OMException if the dest dir is a sub-dir of source dir.
+   */
+  public static void verifyToDirIsASubDirOfFromDirectory(String fromKeyName,
+      String toKeyName, boolean isDir) throws OMException {
+    if (!isDir) {
+      return;
+    }
+    Path dstParent = Paths.get(toKeyName).getParent();
+    while (dstParent != null) {
+      if (Paths.get(fromKeyName).equals(dstParent)) {
+        throw new OMException("Cannot rename a directory to its own " +
+                "subdirectory", OMException.ResultCodes.KEY_RENAME_ERROR);
+        // TODO: Existing rename throws java.lang.IllegalArgumentException.
+        //       Should we throw same exception ?
+      }
+      dstParent = dstParent.getParent();
+    }
+  }
+
+  /**
+   * Verify parent exists for the destination path and return destination
+   * path parent Id.
+   * <p>
+   * Check whether dst parent dir exists or not. If the parent exists, then the
+   * source can be renamed to dst path.
+   *
+   * @param volumeName  volume name
+   * @param bucketName  bucket name
+   * @param toKeyName   destination path
+   * @param fromKeyName source path
+   * @param metaMgr     metadata manager
+   * @throws IOException if the destination parent dir doesn't exists.
+   */
+  public static long getToKeyNameParentId(String volumeName,
+      String bucketName, String toKeyName, String fromKeyName,
+      OMMetadataManager metaMgr) throws IOException {
+
+    int totalDirsCount = OzoneFSUtils.getFileCount(toKeyName);
+    // skip parent is root '/'
+    if (totalDirsCount <= 1) {
+      String bucketKey = metaMgr.getBucketKey(volumeName, bucketName);
+      OmBucketInfo omBucketInfo =
+              metaMgr.getBucketTable().get(bucketKey);
+      return omBucketInfo.getObjectID();
+    }
+
+    String toKeyParentDir = OzoneFSUtils.getParentDir(toKeyName);
+
+    OzoneFileStatus toKeyParentDirStatus = getOMKeyInfoIfExists(metaMgr,
+            volumeName, bucketName, toKeyParentDir, 0);
+    // check if the immediate parent exists
+    if (toKeyParentDirStatus == null) {
+      throw new OMException(String.format(
+              "Failed to rename %s to %s, %s doesn't exist", fromKeyName,
+              toKeyName, toKeyParentDir),
+              OMException.ResultCodes.KEY_RENAME_ERROR);
+    } else if (toKeyParentDirStatus.isFile()){
+      throw new OMException(String.format(
+              "Failed to rename %s to %s, %s is a file", fromKeyName, toKeyName,
+              toKeyParentDir), OMException.ResultCodes.KEY_RENAME_ERROR);
+    }
+    return toKeyParentDirStatus.getKeyInfo().getObjectID();
+  }
 }
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyRenameRequestV1.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyRenameRequestV1.java
new file mode 100644
index 0000000..74e53fe
--- /dev/null
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyRenameRequestV1.java
@@ -0,0 +1,292 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.om.request.key;
+
+import com.google.common.base.Optional;
+import org.apache.hadoop.hdds.utils.db.Table;
+import org.apache.hadoop.hdds.utils.db.cache.CacheKey;
+import org.apache.hadoop.hdds.utils.db.cache.CacheValue;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.audit.AuditLogger;
+import org.apache.hadoop.ozone.audit.OMAction;
+import org.apache.hadoop.ozone.om.OMMetadataManager;
+import org.apache.hadoop.ozone.om.OMMetrics;
+import org.apache.hadoop.ozone.om.OzoneManager;
+import org.apache.hadoop.ozone.om.exceptions.OMException;
+import org.apache.hadoop.ozone.om.helpers.OmDirectoryInfo;
+import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
+import org.apache.hadoop.ozone.om.helpers.OzoneFSUtils;
+import org.apache.hadoop.ozone.om.helpers.OzoneFileStatus;
+import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerDoubleBufferHelper;
+import org.apache.hadoop.ozone.om.request.file.OMFileRequest;
+import org.apache.hadoop.ozone.om.request.util.OmResponseUtil;
+import org.apache.hadoop.ozone.om.response.OMClientResponse;
+import org.apache.hadoop.ozone.om.response.key.OMKeyRenameResponse;
+import org.apache.hadoop.ozone.om.response.key.OMKeyRenameResponseV1;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.*;
+import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer;
+import org.apache.hadoop.ozone.security.acl.OzoneObj;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Map;
+
+import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.KEY_NOT_FOUND;
+import static org.apache.hadoop.ozone.om.lock.OzoneManagerLock.Resource.BUCKET_LOCK;
+
+/**
+ * Handles rename key request layout version V1.
+ */
+public class OMKeyRenameRequestV1 extends OMKeyRenameRequest {
+
+  private static final Logger LOG =
+          LoggerFactory.getLogger(OMKeyRenameRequestV1.class);
+
+  public OMKeyRenameRequestV1(OMRequest omRequest) {
+    super(omRequest);
+  }
+
+  @Override
+  @SuppressWarnings("methodlength")
+  public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager,
+      long trxnLogIndex, OzoneManagerDoubleBufferHelper omDoubleBufferHelper) {
+
+    RenameKeyRequest renameKeyRequest = getOmRequest().getRenameKeyRequest();
+    KeyArgs keyArgs = renameKeyRequest.getKeyArgs();
+    Map<String, String> auditMap = buildAuditMap(keyArgs, renameKeyRequest);
+
+    String volumeName = keyArgs.getVolumeName();
+    String bucketName = keyArgs.getBucketName();
+    String fromKeyName = keyArgs.getKeyName();
+    String toKeyName = renameKeyRequest.getToKeyName();
+
+    OMMetrics omMetrics = ozoneManager.getMetrics();
+    omMetrics.incNumKeyRenames();
+
+    AuditLogger auditLogger = ozoneManager.getAuditLogger();
+
+    OMResponse.Builder omResponse = OmResponseUtil.getOMResponseBuilder(
+            getOmRequest());
+
+    OMMetadataManager omMetadataManager = ozoneManager.getMetadataManager();
+    boolean acquiredLock = false;
+    OMClientResponse omClientResponse = null;
+    IOException exception = null;
+    OmKeyInfo fromKeyValue;
+    String fromKey = null;
+    Result result;
+    try {
+      if (toKeyName.length() == 0 || fromKeyName.length() == 0) {
+        throw new OMException("Key name is empty",
+                OMException.ResultCodes.INVALID_KEY_NAME);
+      }
+
+      keyArgs = resolveBucketLink(ozoneManager, keyArgs, auditMap);
+      volumeName = keyArgs.getVolumeName();
+      bucketName = keyArgs.getBucketName();
+
+      // check Acls to see if user has access to perform delete operation on
+      // old key and create operation on new key
+      checkKeyAcls(ozoneManager, volumeName, bucketName, fromKeyName,
+              IAccessAuthorizer.ACLType.DELETE, OzoneObj.ResourceType.KEY);
+      checkKeyAcls(ozoneManager, volumeName, bucketName, toKeyName,
+              IAccessAuthorizer.ACLType.CREATE, OzoneObj.ResourceType.KEY);
+
+      acquiredLock = omMetadataManager.getLock().acquireWriteLock(BUCKET_LOCK,
+              volumeName, bucketName);
+
+      // Validate bucket and volume exists or not.
+      validateBucketAndVolume(omMetadataManager, volumeName, bucketName);
+
+      // Check if fromKey exists
+      OzoneFileStatus fromKeyFileStatus =
+              OMFileRequest.getOMKeyInfoIfExists(omMetadataManager, volumeName,
+                      bucketName, fromKeyName, 0);
+      // case-1) fromKeyName should exist, otw throws exception
+      if (fromKeyFileStatus == null) {
+        // TODO: Add support for renaming open key
+        throw new OMException("Key not found " + fromKey, KEY_NOT_FOUND);
+      }
+
+      // source existed
+      fromKeyValue = fromKeyFileStatus.getKeyInfo();
+      boolean isRenameDirectory = fromKeyFileStatus.isDirectory();
+
+      // case-2) Cannot rename a directory to its own subdirectory
+      OMFileRequest.verifyToDirIsASubDirOfFromDirectory(fromKeyName,
+              toKeyName, fromKeyFileStatus.isDirectory());
+
+      OzoneFileStatus toKeyFileStatus =
+              OMFileRequest.getOMKeyInfoIfExists(omMetadataManager,
+                      volumeName, bucketName, toKeyName, 0);
+
+      // Check if toKey exists.
+      if(toKeyFileStatus != null) {
+        // Destination exists and following are different cases:
+        OmKeyInfo toKeyValue = toKeyFileStatus.getKeyInfo();
+
+        if (fromKeyValue.getKeyName().equals(toKeyValue.getKeyName())) {
+          // case-3) If src == destin then check source and destin of same type
+          // (a) If dst is a file then return true.
+          // (b) Otherwise throws exception.
+          // TODO: Discuss do we need to throw exception for file as well.
+          if (toKeyFileStatus.isFile()) {
+            result = Result.SUCCESS;
+          } else {
+            throw new OMException("Key already exists " + toKeyName,
+                    OMException.ResultCodes.KEY_ALREADY_EXISTS);
+          }
+        } else if (toKeyFileStatus.isDirectory()) {
+          // case-4) If dst is a directory then rename source as sub-path of it
+          // For example: rename /source to /dst will lead to /dst/source
+          String fromFileName = OzoneFSUtils.getFileName(fromKeyName);
+          String newToKeyName = OzoneFSUtils.appendFileNameToKeyPath(toKeyName,
+                  fromFileName);
+          OzoneFileStatus newToOzoneFileStatus =
+                  OMFileRequest.getOMKeyInfoIfExists(omMetadataManager,
+                          volumeName, bucketName, newToKeyName, 0);
+
+          if (newToOzoneFileStatus != null) {
+            // case-5) If new destin '/dst/source' exists then throws exception
+            throw new OMException(String.format(
+                    "Failed to rename %s to %s, file already exists or not " +
+                            "empty!", fromKeyName, newToKeyName),
+                    OMException.ResultCodes.KEY_ALREADY_EXISTS);
+          }
+
+          omClientResponse = renameKey(toKeyValue.getObjectID(), trxnLogIndex,
+                  fromKeyValue, isRenameDirectory, newToKeyName,
+                  keyArgs.getModificationTime(), omResponse, ozoneManager);
+          result = Result.SUCCESS;
+        } else {
+          // case-6) If destination is a file type and if exists then throws
+          // key already exists exception.
+          throw new OMException("Failed to rename, key already exists "
+                  + toKeyName, OMException.ResultCodes.KEY_ALREADY_EXISTS);
+        }
+      } else {
+        // Destination doesn't exist and the cases are:
+        // case-7) Check whether dst parent dir exists or not. If parent
+        // doesn't exist then throw exception, otw the source can be renamed to
+        // destination path.
+        long toKeyParentId = OMFileRequest.getToKeyNameParentId(volumeName,
+                bucketName, toKeyName, fromKeyName, omMetadataManager);
+
+        omClientResponse = renameKey(toKeyParentId, trxnLogIndex,
+                fromKeyValue, isRenameDirectory, toKeyName,
+                keyArgs.getModificationTime(), omResponse, ozoneManager);
+
+        result = Result.SUCCESS;
+      }
+    } catch (IOException ex) {
+      result = Result.FAILURE;
+      exception = ex;
+      omClientResponse = new OMKeyRenameResponse(createErrorOMResponse(
+              omResponse, exception));
+    } finally {
+      addResponseToDoubleBuffer(trxnLogIndex, omClientResponse,
+              omDoubleBufferHelper);
+      if (acquiredLock) {
+        omMetadataManager.getLock().releaseWriteLock(BUCKET_LOCK, volumeName,
+                bucketName);
+      }
+    }
+
+    auditLog(auditLogger, buildAuditMessage(OMAction.RENAME_KEY, auditMap,
+            exception, getOmRequest().getUserInfo()));
+
+    switch (result) {
+    case SUCCESS:
+      LOG.debug("Rename Key is successfully completed for volume:{} bucket:{}" +
+                      " fromKey:{} toKey:{}. ", volumeName, bucketName,
+              fromKeyName, toKeyName);
+      break;
+    case FAILURE:
+      ozoneManager.getMetrics().incNumKeyRenameFails();
+      LOG.error("Rename key failed for volume:{} bucket:{} fromKey:{} " +
+                      "toKey:{}. Key: {} not found.", volumeName, bucketName,
+              fromKeyName, toKeyName, fromKeyName);
+      break;
+    default:
+      LOG.error("Unrecognized Result for OMKeyRenameRequest: {}",
+              renameKeyRequest);
+    }
+    return omClientResponse;
+  }
+
+  @SuppressWarnings("parameternumber")
+  private OMClientResponse renameKey(long toKeyParentId,
+      long trxnLogIndex, OmKeyInfo fromKeyValue, boolean isRenameDirectory,
+      String toKeyName, long modificationTime, OMResponse.Builder omResponse,
+      OzoneManager ozoneManager) {
+
+    String dbFromKey = fromKeyValue.getPath();
+    String toKeyFileName = OzoneFSUtils.getFileName(toKeyName);
+
+    fromKeyValue.setUpdateID(trxnLogIndex, ozoneManager.isRatisEnabled());
+    // Set toFileName
+    fromKeyValue.setKeyName(toKeyFileName);
+    fromKeyValue.setFileName(toKeyFileName);
+    // Set toKeyObjectId
+    fromKeyValue.setParentObjectID(toKeyParentId);
+    //Set modification time
+    fromKeyValue.setModificationTime(modificationTime);
+
+    // destination dbKeyName
+    String dbToKey = fromKeyValue.getPath();
+
+    // Add to cache.
+    // dbFromKey should be deleted, dbToKey should be added with newly updated
+    // omKeyInfo.
+    // Add from_key and to_key details into cache.
+    OMMetadataManager metadataMgr = ozoneManager.getMetadataManager();
+    if (isRenameDirectory) {
+      Table<String, OmDirectoryInfo> dirTable = metadataMgr.getDirectoryTable();
+      dirTable.addCacheEntry(new CacheKey<>(dbFromKey),
+              new CacheValue<>(Optional.absent(), trxnLogIndex));
+
+      dirTable.addCacheEntry(new CacheKey<>(dbToKey),
+              new CacheValue<>(Optional.of(OMFileRequest.
+                              getDirectoryInfo(fromKeyValue)), trxnLogIndex));
+    } else {
+      Table<String, OmKeyInfo> keyTable = metadataMgr.getKeyTable();
+
+      keyTable.addCacheEntry(new CacheKey<>(dbFromKey),
+              new CacheValue<>(Optional.absent(), trxnLogIndex));
+
+      keyTable.addCacheEntry(new CacheKey<>(dbToKey),
+              new CacheValue<>(Optional.of(fromKeyValue), trxnLogIndex));
+    }
+
+    OMClientResponse omClientResponse = new OMKeyRenameResponseV1(omResponse
+            .setRenameKeyResponse(RenameKeyResponse.newBuilder()).build(),
+            dbFromKey, dbToKey, fromKeyValue, isRenameDirectory);
+    return omClientResponse;
+  }
+
+  private Map<String, String> buildAuditMap(
+          KeyArgs keyArgs, RenameKeyRequest renameKeyRequest) {
+    Map<String, String> auditMap = buildKeyArgsAuditMap(keyArgs);
+    auditMap.remove(OzoneConsts.KEY);
+    auditMap.put(OzoneConsts.SRC_KEY, keyArgs.getKeyName());
+    auditMap.put(OzoneConsts.DST_KEY, renameKeyRequest.getToKeyName());
+    return auditMap;
+  }
+}
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/OMKeyRenameResponse.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/OMKeyRenameResponse.java
index 7470b37..3b7edf1 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/OMKeyRenameResponse.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/OMKeyRenameResponse.java
@@ -70,4 +70,15 @@ public class OMKeyRenameResponse extends OMClientResponse {
         renameKeyInfo);
   }
 
+  public OmKeyInfo getRenameKeyInfo() {
+    return renameKeyInfo;
+  }
+
+  public String getFromKeyName() {
+    return fromKeyName;
+  }
+
+  public String getToKeyName() {
+    return toKeyName;
+  }
 }
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/OMKeyRenameResponse.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/OMKeyRenameResponseV1.java
similarity index 50%
copy from hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/OMKeyRenameResponse.java
copy to hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/OMKeyRenameResponseV1.java
index 7470b37..7a9b159 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/OMKeyRenameResponse.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/OMKeyRenameResponseV1.java
@@ -18,56 +18,52 @@
 
 package org.apache.hadoop.ozone.om.response.key;
 
+import org.apache.hadoop.hdds.utils.db.BatchOperation;
 import org.apache.hadoop.ozone.om.OMMetadataManager;
+import org.apache.hadoop.ozone.om.helpers.OmDirectoryInfo;
 import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
+import org.apache.hadoop.ozone.om.request.file.OMFileRequest;
 import org.apache.hadoop.ozone.om.response.CleanupTableInfo;
-import org.apache.hadoop.ozone.om.response.OMClientResponse;
-import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
-    .OMResponse;
-import org.apache.hadoop.hdds.utils.db.BatchOperation;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse;
 
-import java.io.IOException;
 import javax.annotation.Nonnull;
+import java.io.IOException;
 
-import static org.apache.hadoop.ozone.om.OmMetadataManagerImpl.KEY_TABLE;
+import static org.apache.hadoop.ozone.om.OmMetadataManagerImpl.*;
 
 /**
- * Response for RenameKey request.
+ * Response for RenameKey request layout version V1.
  */
-@CleanupTableInfo(cleanupTables = {KEY_TABLE})
-public class OMKeyRenameResponse extends OMClientResponse {
-
-  private String fromKeyName;
-  private String toKeyName;
-  private OmKeyInfo renameKeyInfo;
+@CleanupTableInfo(cleanupTables = {FILE_TABLE, DIRECTORY_TABLE})
+public class OMKeyRenameResponseV1 extends OMKeyRenameResponse {
 
-  public OMKeyRenameResponse(@Nonnull OMResponse omResponse,
-      String fromKeyName, String toKeyName, @Nonnull OmKeyInfo renameKeyInfo) {
-    super(omResponse);
-    this.fromKeyName = fromKeyName;
-    this.toKeyName = toKeyName;
-    this.renameKeyInfo = renameKeyInfo;
-  }
+  private boolean isRenameDirectory;
 
-  /**
-   * For when the request is not successful.
-   * For a successful request, the other constructor should be used.
-   */
-  public OMKeyRenameResponse(@Nonnull OMResponse omResponse) {
-    super(omResponse);
-    checkStatusNotOK();
+  public OMKeyRenameResponseV1(@Nonnull OMResponse omResponse,
+      String fromKeyName, String toKeyName, @Nonnull OmKeyInfo renameKeyInfo,
+      boolean isRenameDirectory) {
+    super(omResponse, fromKeyName, toKeyName, renameKeyInfo);
+    this.isRenameDirectory = isRenameDirectory;
   }
 
   @Override
   public void addToDBBatch(OMMetadataManager omMetadataManager,
-      BatchOperation batchOperation) throws IOException {
-    String volumeName = renameKeyInfo.getVolumeName();
-    String bucketName = renameKeyInfo.getBucketName();
-    omMetadataManager.getKeyTable().deleteWithBatch(batchOperation,
-        omMetadataManager.getOzoneKey(volumeName, bucketName, fromKeyName));
-    omMetadataManager.getKeyTable().putWithBatch(batchOperation,
-        omMetadataManager.getOzoneKey(volumeName, bucketName, toKeyName),
-        renameKeyInfo);
-  }
+                           BatchOperation batchOperation) throws IOException {
+
+    if (isRenameDirectory) {
+      omMetadataManager.getDirectoryTable().deleteWithBatch(batchOperation,
+              getFromKeyName());
 
+      OmDirectoryInfo renameDirInfo =
+              OMFileRequest.getDirectoryInfo(getRenameKeyInfo());
+      omMetadataManager.getDirectoryTable().putWithBatch(batchOperation,
+              getToKeyName(), renameDirInfo);
+
+    } else {
+      omMetadataManager.getKeyTable().deleteWithBatch(batchOperation,
+              getFromKeyName());
+      omMetadataManager.getKeyTable().putWithBatch(batchOperation,
+              getToKeyName(), getRenameKeyInfo());
+    }
+  }
 }
diff --git a/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicOzoneClientAdapterImpl.java b/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicOzoneClientAdapterImpl.java
index 0b50988..5ba7211 100644
--- a/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicOzoneClientAdapterImpl.java
+++ b/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicOzoneClientAdapterImpl.java
@@ -47,6 +47,7 @@ import org.apache.hadoop.ozone.client.OzoneClientFactory;
 import org.apache.hadoop.ozone.client.OzoneKey;
 import org.apache.hadoop.ozone.client.OzoneVolume;
 import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
+import org.apache.hadoop.ozone.om.OMConfigKeys;
 import org.apache.hadoop.ozone.om.exceptions.OMException;
 import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
 import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
@@ -521,4 +522,8 @@ public class BasicOzoneClientAdapterImpl implements OzoneClientAdapter {
     return blockLocations;
   }
 
+  @Override
+  public String getBucketLayoutVersion() {
+    return bucket.getMetadata().get(OMConfigKeys.OZONE_OM_LAYOUT_VERSION);
+  }
 }
diff --git a/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicOzoneFileSystem.java b/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicOzoneFileSystem.java
index e4acabc..c20837b 100644
--- a/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicOzoneFileSystem.java
+++ b/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicOzoneFileSystem.java
@@ -42,6 +42,7 @@ import org.apache.hadoop.hdds.annotation.InterfaceStability;
 import org.apache.hadoop.hdds.conf.ConfigurationSource;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.utils.LegacyHadoopConfigurationSource;
+import org.apache.hadoop.ozone.om.OMConfigKeys;
 import org.apache.hadoop.ozone.om.exceptions.OMException;
 import org.apache.hadoop.ozone.om.helpers.OzoneFSUtils;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -316,6 +317,7 @@ public class BasicOzoneFileSystem extends FileSystem {
 
     String srcPath = src.toUri().getPath();
     String dstPath = dst.toUri().getPath();
+    // TODO: Discuss do we need to throw exception.
     if (srcPath.equals(dstPath)) {
       return true;
     }
@@ -327,6 +329,12 @@ public class BasicOzoneFileSystem extends FileSystem {
       return false;
     }
 
+    String layOutVersion = adapter.getBucketLayoutVersion();
+    if (layOutVersion != null &&
+            OMConfigKeys.OZONE_OM_LAYOUT_VERSION_V1.equals(layOutVersion)) {
+      return renameV1(srcPath, dstPath);
+    }
+
     // Check if the source exists
     FileStatus srcStatus;
     try {
@@ -407,6 +415,11 @@ public class BasicOzoneFileSystem extends FileSystem {
     return result;
   }
 
+  private boolean renameV1(String srcPath, String dstPath) throws IOException {
+    adapter.renameKey(srcPath, dstPath);
+    return true;
+  }
+
   /**
    * Intercept rename to trash calls from TrashPolicyDefault,
    * convert them to delete calls instead.
diff --git a/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicRootedOzoneClientAdapterImpl.java b/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicRootedOzoneClientAdapterImpl.java
index 848119d..1d749d8 100644
--- a/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicRootedOzoneClientAdapterImpl.java
+++ b/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicRootedOzoneClientAdapterImpl.java
@@ -57,6 +57,7 @@ import org.apache.hadoop.ozone.client.OzoneKey;
 import org.apache.hadoop.ozone.client.OzoneVolume;
 import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
 import org.apache.hadoop.ozone.client.protocol.ClientProtocol;
+import org.apache.hadoop.ozone.om.OMConfigKeys;
 import org.apache.hadoop.ozone.om.exceptions.OMException;
 import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
 import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
@@ -1028,4 +1029,10 @@ public class BasicRootedOzoneClientAdapterImpl
         null, null, null, new BlockLocation[0]
     );
   }
+
+  @Override
+  public String getBucketLayoutVersion() {
+    // TODO: Need to refine this part.
+    return OMConfigKeys.OZONE_OM_LAYOUT_VERSION_DEFAULT;
+  }
 }
diff --git a/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/OzoneClientAdapter.java b/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/OzoneClientAdapter.java
index 2b76c22..be93bd6 100644
--- a/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/OzoneClientAdapter.java
+++ b/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/OzoneClientAdapter.java
@@ -75,4 +75,5 @@ public interface OzoneClientAdapter {
   FileStatusAdapter getFileStatus(String key, URI uri,
       Path qualifiedPath, String userName) throws IOException;
 
+  String getBucketLayoutVersion();
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org