You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by na...@apache.org on 2022/06/01 02:37:38 UTC

[ozone] branch master updated: HDDS-6768. Add Volume and Bucket ID to the key path for FSO Objects (#3449)

This is an automated email from the ASF dual-hosted git repository.

nanda pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new 3a4d2d4723 HDDS-6768. Add Volume and Bucket ID to the key path for FSO Objects (#3449)
3a4d2d4723 is described below

commit 3a4d2d47236f549abddb218a82dc917088ee31a2
Author: Nandakumar <na...@gmail.com>
AuthorDate: Wed Jun 1 08:07:33 2022 +0530

    HDDS-6768. Add Volume and Bucket ID to the key path for FSO Objects (#3449)
---
 .../main/java/org/apache/hadoop/ozone/OmUtils.java |   2 +-
 .../fs/ozone/TestOzoneFileSystemWithFSO.java       |  45 ++-
 .../rpc/TestOzoneClientMultipartUploadWithFSO.java |  16 +-
 .../hadoop/ozone/om/TestObjectStoreWithFSO.java    |  28 +-
 .../src/main/proto/OmClientProtocol.proto          |  22 +-
 .../interface-client/src/main/resources/proto.lock | 324 ++++++++++++++++++++-
 .../apache/hadoop/ozone/om/OMMetadataManager.java  |  35 ++-
 .../hadoop/ozone/om/DirectoryDeletingService.java  |  59 ++--
 .../org/apache/hadoop/ozone/om/KeyManager.java     |  12 +-
 .../org/apache/hadoop/ozone/om/KeyManagerImpl.java |  71 +++--
 .../hadoop/ozone/om/OmMetadataManagerImpl.java     |  61 +++-
 .../hadoop/ozone/om/OzoneListStatusHelper.java     |  35 ++-
 .../om/ratis/utils/OzoneManagerRatisUtils.java     |   6 +-
 .../file/OMDirectoryCreateRequestWithFSO.java      |  11 +-
 .../request/file/OMFileCreateRequestWithFSO.java   |  17 +-
 .../ozone/om/request/file/OMFileRequest.java       |  91 ++++--
 .../request/key/OMAllocateBlockRequestWithFSO.java |  15 +-
 ....java => OMDirectoriesPurgeRequestWithFSO.java} |  23 +-
 .../om/request/key/OMKeyCommitRequestWithFSO.java  |  16 +-
 .../om/request/key/OMKeyCreateRequestWithFSO.java  |  23 +-
 .../om/request/key/OMKeyDeleteRequestWithFSO.java  |   8 +-
 .../om/request/key/OMKeyRenameRequestWithFSO.java  |  14 +-
 .../hadoop/ozone/om/request/key/OMKeyRequest.java  |   6 +-
 .../ozone/om/request/key/OMKeysDeleteRequest.java  |   3 +-
 .../om/request/key/OmKeysDeleteRequestWithFSO.java |  25 +-
 .../om/request/key/acl/OMKeyAclRequestWithFSO.java |   5 +-
 .../S3InitiateMultipartUploadRequestWithFSO.java   |  14 +-
 .../S3MultipartUploadAbortRequestWithFSO.java      |  15 +-
 .../S3MultipartUploadCommitPartRequestWithFSO.java |  13 +-
 .../S3MultipartUploadCompleteRequestWithFSO.java   |  13 +-
 .../file/OMDirectoryCreateResponseWithFSO.java     |  13 +-
 .../response/file/OMFileCreateResponseWithFSO.java |   7 +-
 .../key/OMDirectoriesPurgeResponseWithFSO.java     | 129 ++++++++
 .../response/key/OMKeyDeleteResponseWithFSO.java   |  11 +-
 .../response/key/OMKeysDeleteResponseWithFSO.java  |  12 +-
 .../response/key/OMPathsPurgeResponseWithFSO.java  | 125 --------
 .../response/key/acl/OMKeyAclResponseWithFSO.java  |   9 +-
 .../S3InitiateMultipartUploadResponseWithFSO.java  |   9 +-
 .../hadoop/ozone/om/TestOmMetadataManager.java     |   8 +-
 .../ozone/om/request/OMRequestTestUtils.java       |  61 ++--
 .../request/TestBucketLayoutAwareOMKeyFactory.java |   9 +-
 .../file/TestOMDirectoryCreateRequestWithFSO.java  |  91 +++---
 .../file/TestOMFileCreateRequestWithFSO.java       |  25 +-
 .../key/TestOMAllocateBlockRequestWithFSO.java     |  16 +-
 .../om/request/key/TestOMKeyAclRequestWithFSO.java |   8 +-
 .../om/request/key/TestOMKeyCommitRequest.java     |   6 +-
 .../request/key/TestOMKeyCommitRequestWithFSO.java |  12 +-
 .../request/key/TestOMKeyCreateRequestWithFSO.java |  44 ++-
 .../request/key/TestOMKeyDeleteRequestWithFSO.java |   8 +-
 .../request/key/TestOMOpenKeysDeleteRequest.java   |  57 ++--
 ...estS3InitiateMultipartUploadRequestWithFSO.java |  24 +-
 .../TestS3MultipartUploadAbortRequest.java         |   2 +-
 .../TestS3MultipartUploadAbortRequestWithFSO.java  |  10 +-
 .../TestS3MultipartUploadCommitPartRequest.java    |   5 +-
 ...tS3MultipartUploadCommitPartRequestWithFSO.java |  18 +-
 ...estS3MultipartUploadCompleteRequestWithFSO.java |  26 +-
 .../file/TestOMDirectoryCreateResponseWithFSO.java |  51 +++-
 .../file/TestOMFileCreateResponseWithFSO.java      |   8 +-
 .../key/TestOMAllocateBlockResponseWithFSO.java    |   5 +-
 .../om/response/key/TestOMKeyCommitResponse.java   |   4 +-
 .../key/TestOMKeyCommitResponseWithFSO.java        |  18 +-
 .../key/TestOMKeyCreateResponseWithFSO.java        |   8 +-
 .../om/response/key/TestOMKeyDeleteResponse.java   |  20 +-
 .../key/TestOMKeyDeleteResponseWithFSO.java        |   5 +-
 .../ozone/om/response/key/TestOMKeyResponse.java   |  32 +-
 .../response/key/TestOMOpenKeysDeleteResponse.java |  14 +-
 ...stS3InitiateMultipartUploadResponseWithFSO.java |  10 +-
 .../s3/multipart/TestS3MultipartResponse.java      |  64 +++-
 .../TestS3MultipartUploadAbortResponse.java        |   8 +-
 .../TestS3MultipartUploadAbortResponseWithFSO.java |  13 +-
 ...S3MultipartUploadCommitPartResponseWithFSO.java |  26 +-
 ...stS3MultipartUploadCompleteResponseWithFSO.java |  33 ++-
 .../hadoop/ozone/recon/api/NSSummaryEndpoint.java  |  31 +-
 .../ozone/recon/OMMetadataManagerTestUtils.java    |  15 +-
 .../ozone/recon/api/TestNSSummaryEndpoint.java     |  26 +-
 .../ozone/recon/tasks/TestNSSummaryTask.java       |  18 +-
 .../apache/hadoop/ozone/debug/PrefixParser.java    |  27 +-
 77 files changed, 1577 insertions(+), 632 deletions(-)

diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/OmUtils.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/OmUtils.java
index c1c522ede5..7e96cd6556 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/OmUtils.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/OmUtils.java
@@ -302,7 +302,7 @@ public final class OmUtils {
     case DeleteOpenKeys:
     case SetS3Secret:
     case RevokeS3Secret:
-    case PurgePaths:
+    case PurgeDirectories:
     case CreateTenant:
     case DeleteTenant:
     case TenantAssignUserAccessId:
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneFileSystemWithFSO.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneFileSystemWithFSO.java
index 70193a3c8c..83fa1e76a6 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneFileSystemWithFSO.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneFileSystemWithFSO.java
@@ -380,14 +380,21 @@ public class TestOzoneFileSystemWithFSO extends TestOzoneFileSystem {
         .get(omMgr.getBucketKey(getVolumeName(), getBucketName()));
     Assert.assertNotNull("Failed to find bucketInfo", omBucketInfo);
 
+    final long volumeId = omMgr.getVolumeId(getVolumeName());
+    final long bucketId = omMgr.getBucketId(getVolumeName(), getBucketName());
+
     ArrayList<String> dirKeys = new ArrayList<>();
     long d1ObjectID =
-        verifyDirKey(omBucketInfo.getObjectID(), "d1", "/d1", dirKeys, omMgr);
-    long d2ObjectID = verifyDirKey(d1ObjectID, "d2", "/d1/d2", dirKeys, omMgr);
+        verifyDirKey(volumeId, bucketId, omBucketInfo.getObjectID(),
+                "d1", "/d1", dirKeys, omMgr);
+    long d2ObjectID = verifyDirKey(volumeId, bucketId, d1ObjectID,
+            "d2", "/d1/d2", dirKeys, omMgr);
     long d3ObjectID =
-        verifyDirKey(d2ObjectID, "d3", "/d1/d2/d3", dirKeys, omMgr);
+        verifyDirKey(volumeId, bucketId, d2ObjectID,
+                "d3", "/d1/d2/d3", dirKeys, omMgr);
     long d4ObjectID =
-        verifyDirKey(d3ObjectID, "d4", "/d1/d2/d3/d4", dirKeys, omMgr);
+        verifyDirKey(volumeId, bucketId, d3ObjectID,
+                "d4", "/d1/d2/d3/d4", dirKeys, omMgr);
 
     Assert.assertEquals("Wrong OM numKeys metrics", 4,
         getCluster().getOzoneManager().getMetrics().getNumKeys());
@@ -398,9 +405,11 @@ public class TestOzoneFileSystemWithFSO extends TestOzoneFileSystem {
     Path subDir6 = new Path("/d1/d2/d3/d4/d6");
     getFs().mkdirs(subDir6);
     long d5ObjectID =
-        verifyDirKey(d4ObjectID, "d5", "/d1/d2/d3/d4/d5", dirKeys, omMgr);
+        verifyDirKey(volumeId, bucketId, d4ObjectID,
+                "d5", "/d1/d2/d3/d4/d5", dirKeys, omMgr);
     long d6ObjectID =
-        verifyDirKey(d4ObjectID, "d6", "/d1/d2/d3/d4/d6", dirKeys, omMgr);
+        verifyDirKey(volumeId, bucketId, d4ObjectID,
+                "d6", "/d1/d2/d3/d4/d6", dirKeys, omMgr);
     Assert.assertTrue(
         "Wrong objectIds for sub-dirs[" + d5ObjectID + "/d5, " + d6ObjectID
             + "/d6] of same parent!", d5ObjectID != d6ObjectID);
@@ -424,10 +433,18 @@ public class TestOzoneFileSystemWithFSO extends TestOzoneFileSystem {
     Assert.assertNotNull("Failed to find bucketInfo", omBucketInfo);
 
     ArrayList<String> dirKeys = new ArrayList<>();
+
+    final long volumeId = omMgr.getVolumeId(getVolumeName());
+    final long bucketId = omMgr.getBucketId(getVolumeName(), getBucketName());
     long d1ObjectID =
-        verifyDirKey(omBucketInfo.getObjectID(), "d1", "/d1", dirKeys, omMgr);
-    long d2ObjectID = verifyDirKey(d1ObjectID, "d2", "/d1/d2", dirKeys, omMgr);
-    openFileKey = d2ObjectID + OzoneConsts.OM_KEY_PREFIX + file.getName();
+        verifyDirKey(volumeId, bucketId, omBucketInfo.getObjectID(),
+                "d1", "/d1", dirKeys, omMgr);
+    long d2ObjectID = verifyDirKey(volumeId, bucketId, d1ObjectID,
+            "d2", "/d1/d2", dirKeys, omMgr);
+    openFileKey = OzoneConsts.OM_KEY_PREFIX + volumeId +
+            OzoneConsts.OM_KEY_PREFIX + bucketId +
+            OzoneConsts.OM_KEY_PREFIX + d2ObjectID +
+            OzoneConsts.OM_KEY_PREFIX + file.getName();
 
     // trigger CommitKeyRequest
     outputStream.close();
@@ -457,10 +474,12 @@ public class TestOzoneFileSystemWithFSO extends TestOzoneFileSystem {
     Assert.assertEquals("Wrong path format", dbKey, omKeyInfo.getPath());
   }
 
-  long verifyDirKey(long parentId, String dirKey, String absolutePath,
-      ArrayList<String> dirKeys, OMMetadataManager omMgr)
+  long verifyDirKey(long volumeId, long bucketId, long parentId,
+                    String dirKey, String absolutePath,
+                    ArrayList<String> dirKeys, OMMetadataManager omMgr)
       throws Exception {
-    String dbKey = parentId + "/" + dirKey;
+    String dbKey = "/" + volumeId + "/" + bucketId + "/" +
+            parentId + "/" + dirKey;
     dirKeys.add(dbKey);
     OmDirectoryInfo dirInfo = omMgr.getDirectoryTable().get(dbKey);
     Assert.assertNotNull("Failed to find " + absolutePath +
@@ -473,8 +492,6 @@ public class TestOzoneFileSystemWithFSO extends TestOzoneFileSystem {
         dirInfo.getCreationTime() > 0);
     Assert.assertEquals("Mismatches directory modification time param",
         dirInfo.getCreationTime(), dirInfo.getModificationTime());
-    Assert.assertEquals("Wrong representation!",
-        dbKey + ":" + dirInfo.getObjectID(), dirInfo.toString());
     return dirInfo.getObjectID();
   }
 
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneClientMultipartUploadWithFSO.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneClientMultipartUploadWithFSO.java
index 293725f508..262b444cc9 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneClientMultipartUploadWithFSO.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneClientMultipartUploadWithFSO.java
@@ -978,11 +978,14 @@ public class TestOzoneClientMultipartUploadWithFSO {
       OMMetadataManager omMetadataManager) throws IOException {
 
     String fileName = OzoneFSUtils.getFileName(keyName);
+    final long volumeId = omMetadataManager.getVolumeId(volumeName);
+    final long bucketId = omMetadataManager.getBucketId(volumeName,
+            bucketName);
     long parentID = getParentID(volumeName, bucketName, keyName,
         omMetadataManager);
 
-    String multipartKey = omMetadataManager.getMultipartKey(parentID,
-        fileName, multipartUploadID);
+    String multipartKey = omMetadataManager.getMultipartKey(volumeId, bucketId,
+            parentID, fileName, multipartUploadID);
 
     return multipartKey;
   }
@@ -990,11 +993,10 @@ public class TestOzoneClientMultipartUploadWithFSO {
   private long getParentID(String volumeName, String bucketName,
       String keyName, OMMetadataManager omMetadataManager) throws IOException {
     Iterator<Path> pathComponents = Paths.get(keyName).iterator();
-    String bucketKey = omMetadataManager.getBucketKey(volumeName, bucketName);
-    OmBucketInfo omBucketInfo =
-        omMetadataManager.getBucketTable().get(bucketKey);
-    long bucketId = omBucketInfo.getObjectID();
-    return OMFileRequest.getParentID(bucketId, pathComponents,
+    final long volumeId = omMetadataManager.getVolumeId(volumeName);
+    final long bucketId = omMetadataManager.getBucketId(volumeName,
+            bucketName);
+    return OMFileRequest.getParentID(volumeId, bucketId, pathComponents,
         keyName, omMetadataManager);
   }
 
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestObjectStoreWithFSO.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestObjectStoreWithFSO.java
index a0ba00a354..af82b7cab6 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestObjectStoreWithFSO.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestObjectStoreWithFSO.java
@@ -45,7 +45,6 @@ import org.apache.hadoop.ozone.om.exceptions.OMException;
 import org.apache.hadoop.ozone.om.helpers.BucketLayout;
 import org.apache.hadoop.ozone.om.helpers.OmDirectoryInfo;
 import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
-import org.apache.hadoop.ozone.om.request.OMRequestTestUtils;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.ozone.test.GenericTestUtils;
 import org.junit.Assert;
@@ -70,7 +69,6 @@ import java.util.concurrent.TimeoutException;
 import static org.apache.hadoop.hdds.client.ReplicationFactor.ONE;
 import static org.apache.hadoop.hdds.client.ReplicationType.RATIS;
 import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_FS_ITERATE_BATCH_SIZE;
-import static org.apache.hadoop.ozone.OzoneConsts.OM_KEY_PREFIX;
 import static org.apache.hadoop.ozone.OzoneConsts.OZONE_URI_SCHEME;
 import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.KEY_ALREADY_EXISTS;
 import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.KEY_NOT_FOUND;
@@ -785,17 +783,16 @@ public class TestObjectStoreWithFSO {
   private OmDirectoryInfo getDirInfo(String parentKey) throws Exception {
     OMMetadataManager omMetadataManager =
             cluster.getOzoneManager().getMetadataManager();
-    long bucketId = OMRequestTestUtils.getBucketId(volumeName, bucketName,
-            omMetadataManager);
+    long volumeId = omMetadataManager.getVolumeId(volumeName);
+    long bucketId = omMetadataManager.getBucketId(volumeName, bucketName);
     String[] pathComponents = StringUtils.split(parentKey, '/');
     long parentId = bucketId;
     OmDirectoryInfo dirInfo = null;
     for (int indx = 0; indx < pathComponents.length; indx++) {
       String pathElement = pathComponents[indx];
-      String dbKey = omMetadataManager.getOzonePathKey(parentId,
-              pathElement);
-      dirInfo =
-              omMetadataManager.getDirectoryTable().get(dbKey);
+      String dbKey = omMetadataManager.getOzonePathKey(volumeId,
+              bucketId, parentId, pathElement);
+      dirInfo = omMetadataManager.getDirectoryTable().get(dbKey);
       parentId = dirInfo.getObjectID();
     }
     return dirInfo;
@@ -804,7 +801,12 @@ public class TestObjectStoreWithFSO {
   private void verifyKeyInFileTable(Table<String, OmKeyInfo> fileTable,
       String fileName, long parentID, boolean isEmpty) throws IOException {
 
-    String dbFileKey = parentID + OM_KEY_PREFIX + fileName;
+    final OMMetadataManager omMetadataManager =
+            cluster.getOzoneManager().getMetadataManager();
+    final String dbFileKey = omMetadataManager.getOzonePathKey(
+            omMetadataManager.getVolumeId(volumeName),
+            omMetadataManager.getBucketId(volumeName, bucketName),
+            parentID, fileName);
     OmKeyInfo omKeyInfo = fileTable.get(dbFileKey);
     if (isEmpty) {
       Assert.assertNull("Table is not empty!", omKeyInfo);
@@ -822,8 +824,12 @@ public class TestObjectStoreWithFSO {
   private void verifyKeyInOpenFileTable(Table<String, OmKeyInfo> openFileTable,
       long clientID, String fileName, long parentID, boolean isEmpty)
           throws IOException, TimeoutException, InterruptedException {
-    String dbOpenFileKey =
-            parentID + OM_KEY_PREFIX + fileName + OM_KEY_PREFIX + clientID;
+    final OMMetadataManager omMetadataManager =
+            cluster.getOzoneManager().getMetadataManager();
+    final String dbOpenFileKey = omMetadataManager.getOpenFileName(
+            omMetadataManager.getVolumeId(volumeName),
+            omMetadataManager.getBucketId(volumeName, bucketName),
+            parentID, fileName, clientID);
 
     if (isEmpty) {
       // wait for DB updates
diff --git a/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto b/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto
index c6a6745d5f..60a03545a1 100644
--- a/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto
+++ b/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto
@@ -101,7 +101,7 @@ enum Type {
 
   RevokeS3Secret = 93;
 
-  PurgePaths = 94;
+  PurgeDirectories = 94;
 
   CreateTenant = 96;
   DeleteTenant = 97;
@@ -201,7 +201,7 @@ message OMRequest {
 
   optional RevokeS3SecretRequest            RevokeS3SecretRequest          = 93;
 
-  optional PurgePathsRequest                purgePathsRequest              = 94;
+  optional PurgeDirectoriesRequest          purgeDirectoriesRequest        = 94;
 
   optional S3Authentication                 s3Authentication               = 95;
 
@@ -297,7 +297,7 @@ message OMResponse {
 
   optional ListTrashResponse                  listTrashResponse            = 91;
   optional RecoverTrashResponse               RecoverTrashResponse         = 92;
-  optional PurgePathsResponse                 purgePathsResponse           = 93;
+  optional PurgeDirectoriesResponse           purgeDirectoriesResponse     = 93;
 
   // Skipped 94/95 to align with OMRequest
   optional CreateTenantResponse              CreateTenantResponse          = 96;
@@ -1080,13 +1080,19 @@ message PurgeKeysResponse {
 
 }
 
-message PurgePathsRequest {
-    repeated string deletedDirs = 1;
-    repeated KeyInfo deletedSubFiles = 2;
-    repeated KeyInfo markDeletedSubDirs = 3;
+message PurgeDirectoriesRequest {
+  repeated PurgePathRequest deletedPath = 1;
 }
 
-message PurgePathsResponse {
+message PurgePathRequest {
+    required uint64 volumeId = 1;
+    required uint64 bucketId = 2;
+    optional string deletedDir = 3;
+    repeated KeyInfo deletedSubFiles = 4;
+    repeated KeyInfo markDeletedSubDirs = 5;
+}
+
+message PurgeDirectoriesResponse {
 
 }
 
diff --git a/hadoop-ozone/interface-client/src/main/resources/proto.lock b/hadoop-ozone/interface-client/src/main/resources/proto.lock
index e6d24a8950..367bf166d7 100644
--- a/hadoop-ozone/interface-client/src/main/resources/proto.lock
+++ b/hadoop-ozone/interface-client/src/main/resources/proto.lock
@@ -1,5 +1,161 @@
 {
   "definitions": [
+    {
+      "protopath": "OMAdminProtocol.proto",
+      "def": {
+        "enums": [
+          {
+            "name": "NodeState",
+            "enum_fields": [
+              {
+                "name": "ACTIVE",
+                "integer": 1
+              },
+              {
+                "name": "DECOMMISSIONED",
+                "integer": 2
+              }
+            ]
+          }
+        ],
+        "messages": [
+          {
+            "name": "OMConfigurationRequest"
+          },
+          {
+            "name": "OMConfigurationResponse",
+            "fields": [
+              {
+                "id": 1,
+                "name": "success",
+                "type": "bool"
+              },
+              {
+                "id": 2,
+                "name": "errorMsg",
+                "type": "string"
+              },
+              {
+                "id": 3,
+                "name": "nodesInMemory",
+                "type": "OMNodeInfo",
+                "is_repeated": true
+              },
+              {
+                "id": 4,
+                "name": "nodesInNewConf",
+                "type": "OMNodeInfo",
+                "is_repeated": true
+              }
+            ]
+          },
+          {
+            "name": "OMNodeInfo",
+            "fields": [
+              {
+                "id": 1,
+                "name": "nodeID",
+                "type": "string"
+              },
+              {
+                "id": 2,
+                "name": "hostAddress",
+                "type": "string"
+              },
+              {
+                "id": 3,
+                "name": "rpcPort",
+                "type": "uint32"
+              },
+              {
+                "id": 4,
+                "name": "ratisPort",
+                "type": "uint32"
+              },
+              {
+                "id": 5,
+                "name": "nodeState",
+                "type": "NodeState",
+                "options": [
+                  {
+                    "name": "default",
+                    "value": "ACTIVE"
+                  }
+                ]
+              }
+            ]
+          },
+          {
+            "name": "DecommissionOMRequest",
+            "fields": [
+              {
+                "id": 1,
+                "name": "nodeId",
+                "type": "string"
+              },
+              {
+                "id": 2,
+                "name": "nodeAddress",
+                "type": "string"
+              }
+            ]
+          },
+          {
+            "name": "DecommissionOMResponse",
+            "fields": [
+              {
+                "id": 1,
+                "name": "success",
+                "type": "bool"
+              },
+              {
+                "id": 3,
+                "name": "errorMsg",
+                "type": "string"
+              }
+            ]
+          }
+        ],
+        "services": [
+          {
+            "name": "OzoneManagerAdminService",
+            "rpcs": [
+              {
+                "name": "getOMConfiguration",
+                "in_type": "OMConfigurationRequest",
+                "out_type": "OMConfigurationResponse"
+              },
+              {
+                "name": "decommission",
+                "in_type": "DecommissionOMRequest",
+                "out_type": "DecommissionOMResponse"
+              }
+            ]
+          }
+        ],
+        "package": {
+          "name": "hadoop.ozone"
+        },
+        "options": [
+          {
+            "name": "java_package",
+            "value": "org.apache.hadoop.ozone.protocol.proto"
+          },
+          {
+            "name": "java_outer_classname",
+            "value": "OzoneManagerAdminProtocolProtos"
+          },
+          {
+            "name": "java_generic_services",
+            "value": "true"
+          },
+          {
+            "name": "java_generate_equals_and_hash",
+            "value": "true"
+          }
+        ]
+      }
+    },
     {
       "protopath": "OmClientProtocol.proto",
       "def": {
@@ -212,7 +368,7 @@
                 "integer": 93
               },
               {
-                "name": "PurgePaths",
+                "name": "PurgeDirectories",
                 "integer": 94
               }
             ]
@@ -503,6 +659,10 @@
               {
                 "name": "NOT_SUPPORTED_OPERATION_WHEN_PREPARED",
                 "integer": 74
+              },
+              {
+                "name": "NOT_SUPPORTED_OPERATION_PRIOR_FINALIZATION",
+                "integer": 75
               }
             ]
           },
@@ -1016,8 +1176,13 @@
               },
               {
                 "id": 94,
-                "name": "purgePathsRequest",
-                "type": "PurgePathsRequest"
+                "name": "purgeDirectoriesRequest",
+                "type": "PurgeDirectoriesRequest"
+              },
+              {
+                "id": 95,
+                "name": "s3Authentication",
+                "type": "S3Authentication"
               }
             ]
           },
@@ -1307,8 +1472,8 @@
               },
               {
                 "id": 93,
-                "name": "purgePathsResponse",
-                "type": "PurgePathsResponse"
+                "name": "purgeDirectoriesResponse",
+                "type": "PurgeDirectoriesResponse"
               }
             ]
           },
@@ -1795,6 +1960,16 @@
                 "id": 18,
                 "name": "bucketLayout",
                 "type": "BucketLayoutProto"
+              },
+              {
+                "id": 19,
+                "name": "owner",
+                "type": "string"
+              },
+              {
+                "id": 20,
+                "name": "defaultReplicationConfig",
+                "type": "hadoop.hdds.DefaultReplicationConfig"
               }
             ]
           },
@@ -1941,6 +2116,16 @@
                 "id": 9,
                 "name": "quotaInNamespace",
                 "type": "uint64"
+              },
+              {
+                "id": 10,
+                "name": "ownerName",
+                "type": "string"
+              },
+              {
+                "id": 11,
+                "name": "defaultReplicationConfig",
+                "type": "hadoop.hdds.DefaultReplicationConfig"
               }
             ]
           },
@@ -2199,7 +2384,14 @@
             ]
           },
           {
-            "name": "SetBucketPropertyResponse"
+            "name": "SetBucketPropertyResponse",
+            "fields": [
+              {
+                "id": 1,
+                "name": "response",
+                "type": "bool"
+              }
+            ]
           },
           {
             "name": "DeleteBucketRequest",
@@ -2281,12 +2473,24 @@
               {
                 "id": 5,
                 "name": "type",
-                "type": "hadoop.hdds.ReplicationType"
+                "type": "hadoop.hdds.ReplicationType",
+                "options": [
+                  {
+                    "name": "default",
+                    "value": "NONE"
+                  }
+                ]
               },
               {
                 "id": 6,
                 "name": "factor",
-                "type": "hadoop.hdds.ReplicationFactor"
+                "type": "hadoop.hdds.ReplicationFactor",
+                "options": [
+                  {
+                    "name": "default",
+                    "value": "ZERO"
+                  }
+                ]
               },
               {
                 "id": 7,
@@ -2350,6 +2554,11 @@
                 "id": 18,
                 "name": "headOp",
                 "type": "bool"
+              },
+              {
+                "id": 19,
+                "name": "ecReplicationConfig",
+                "type": "hadoop.hdds.ECReplicationConfig"
               }
             ]
           },
@@ -2516,6 +2725,11 @@
                 "id": 16,
                 "name": "parentID",
                 "type": "uint64"
+              },
+              {
+                "id": 17,
+                "name": "ecReplicationConfig",
+                "type": "hadoop.hdds.ECReplicationConfig"
               }
             ]
           },
@@ -2988,22 +3202,42 @@
             "name": "PurgeKeysResponse"
           },
           {
-            "name": "PurgePathsRequest",
+            "name": "PurgeDirectoriesRequest",
             "fields": [
               {
                 "id": 1,
-                "name": "deletedDirs",
-                "type": "string",
+                "name": "deletedPath",
+                "type": "PurgePathRequest",
                 "is_repeated": true
+              }
+            ]
+          },
+          {
+            "name": "PurgePathRequest",
+            "fields": [
+              {
+                "id": 1,
+                "name": "volumeId",
+                "type": "uint64"
               },
               {
                 "id": 2,
+                "name": "bucketId",
+                "type": "uint64"
+              },
+              {
+                "id": 3,
+                "name": "deletedDir",
+                "type": "string"
+              },
+              {
+                "id": 4,
                 "name": "deletedSubFiles",
                 "type": "KeyInfo",
                 "is_repeated": true
               },
               {
-                "id": 3,
+                "id": 5,
                 "name": "markDeletedSubDirs",
                 "type": "KeyInfo",
                 "is_repeated": true
@@ -3011,7 +3245,7 @@
             ]
           },
           {
-            "name": "PurgePathsResponse"
+            "name": "PurgeDirectoriesResponse"
           },
           {
             "name": "DeleteOpenKeysRequest",
@@ -3056,7 +3290,13 @@
               {
                 "id": 2,
                 "name": "clientID",
-                "type": "uint64"
+                "type": "uint64",
+                "options": [
+                  {
+                    "name": "deprecated",
+                    "value": "true"
+                  }
+                ]
               }
             ]
           },
@@ -3269,6 +3509,11 @@
                 "id": 1,
                 "name": "sequenceNumber",
                 "type": "uint64"
+              },
+              {
+                "id": 2,
+                "name": "limitCount",
+                "type": "uint64"
               }
             ]
           },
@@ -3307,6 +3552,11 @@
                 "name": "data",
                 "type": "bytes",
                 "is_repeated": true
+              },
+              {
+                "id": 3,
+                "name": "latestSequenceNumber",
+                "type": "uint64"
               }
             ]
           },
@@ -3491,6 +3741,17 @@
                 "id": 4,
                 "name": "omRole",
                 "type": "OMRoleInfo"
+              },
+              {
+                "id": 5,
+                "name": "OMVersion",
+                "type": "int32",
+                "options": [
+                  {
+                    "name": "default",
+                    "value": "0"
+                  }
+                ]
               }
             ]
           },
@@ -3572,6 +3833,11 @@
                 "id": 8,
                 "name": "parentID",
                 "type": "uint64"
+              },
+              {
+                "id": 9,
+                "name": "ecReplicationConfig",
+                "type": "hadoop.hdds.ECReplicationConfig"
               }
             ]
           },
@@ -3752,6 +4018,11 @@
                 "name": "partsList",
                 "type": "PartInfo",
                 "is_repeated": true
+              },
+              {
+                "id": 7,
+                "name": "ecReplicationConfig",
+                "type": "hadoop.hdds.ECReplicationConfig"
               }
             ]
           },
@@ -3828,6 +4099,11 @@
                 "id": 7,
                 "name": "factor",
                 "type": "hadoop.hdds.ReplicationFactor"
+              },
+              {
+                "id": 8,
+                "name": "ecReplicationConfig",
+                "type": "hadoop.hdds.ECReplicationConfig"
               }
             ]
           },
@@ -3955,6 +4231,26 @@
                 "type": "string"
               }
             ]
+          },
+          {
+            "name": "S3Authentication",
+            "fields": [
+              {
+                "id": 1,
+                "name": "stringToSign",
+                "type": "string"
+              },
+              {
+                "id": 2,
+                "name": "signature",
+                "type": "string"
+              },
+              {
+                "id": 3,
+                "name": "accessId",
+                "type": "string"
+              }
+            ]
           }
         ],
         "services": [
diff --git a/hadoop-ozone/interface-storage/src/main/java/org/apache/hadoop/ozone/om/OMMetadataManager.java b/hadoop-ozone/interface-storage/src/main/java/org/apache/hadoop/ozone/om/OMMetadataManager.java
index 014128995d..67356a88f3 100644
--- a/hadoop-ozone/interface-storage/src/main/java/org/apache/hadoop/ozone/om/OMMetadataManager.java
+++ b/hadoop-ozone/interface-storage/src/main/java/org/apache/hadoop/ozone/om/OMMetadataManager.java
@@ -439,32 +439,42 @@ public interface OMMetadataManager extends DBStoreHAManager {
    * Given parent object id and path component name, return the corresponding
    * DB 'prefixKey' key.
    *
+   * @param volumeId - ID of the volume
+   * @param bucketId - ID of the bucket
    * @param parentObjectId - parent object Id
    * @param pathComponentName   - path component name
    * @return DB directory key as String.
    */
-  String getOzonePathKey(long parentObjectId, String pathComponentName);
+  String getOzonePathKey(long volumeId, long bucketId,
+                         long parentObjectId, String pathComponentName);
 
   /**
    * Returns DB key name of an open file in OM metadata store. Should be
    * #open# prefix followed by actual leaf node name.
    *
+   * @param volumeId       - ID of the volume
+   * @param bucketId       - ID of the bucket
    * @param parentObjectId - parent object Id
    * @param fileName       - file name
    * @param id             - client id for this open request
    * @return DB directory key as String.
    */
-  String getOpenFileName(long parentObjectId, String fileName, long id);
+  String getOpenFileName(long volumeId, long bucketId,
+                         long parentObjectId, String fileName, long id);
 
   /**
    * Returns the DB key name of a multipart upload key in OM metadata store.
    *
+   * @param volumeId       - ID of the volume
+   * @param bucketId       - ID of the bucket
    * @param parentObjectId - parent object Id
    * @param fileName       - file name
    * @param uploadId       - the upload id for this key
    * @return bytes of DB key.
    */
-  String getMultipartKey(long parentObjectId, String fileName, String uploadId);
+  String getMultipartKey(long volumeId, long bucketId,
+                         long parentObjectId, String fileName,
+                         String uploadId);
 
   /**
    * Get Deleted Directory Table.
@@ -473,4 +483,23 @@ public interface OMMetadataManager extends DBStoreHAManager {
    */
   Table<String, OmKeyInfo> getDeletedDirTable();
 
+  /**
+   * Get the ID of given volume.
+   *
+   * @param volume volume name
+   * @return ID of the volume
+   * @throws IOException
+   */
+  long getVolumeId(String volume) throws IOException;
+
+  /**
+   * Get the ID of given bucket.
+   *
+   * @param volume volume name
+   * @param bucket bucket name
+   * @return ID of the bucket
+   * @throws IOException
+   */
+  long getBucketId(String volume, String bucket) throws IOException;
+
 }
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/DirectoryDeletingService.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/DirectoryDeletingService.java
index 2aac5bd827..a4c292a3a6 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/DirectoryDeletingService.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/DirectoryDeletingService.java
@@ -23,6 +23,7 @@ import org.apache.hadoop.hdds.utils.BackgroundService;
 import org.apache.hadoop.hdds.utils.BackgroundTask;
 import org.apache.hadoop.hdds.utils.BackgroundTaskQueue;
 import org.apache.hadoop.hdds.utils.BackgroundTaskResult;
+import org.apache.hadoop.hdds.utils.db.Table;
 import org.apache.hadoop.ozone.ClientVersion;
 import org.apache.hadoop.ozone.om.helpers.OMRatisHelper;
 import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
@@ -35,9 +36,11 @@ import org.apache.ratis.protocol.RaftClientRequest;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Optional;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 
+import static org.apache.hadoop.ozone.OzoneConsts.OM_KEY_PREFIX;
 import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_PATH_DELETING_LIMIT_PER_TASK;
 import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_PATH_DELETING_LIMIT_PER_TASK_DEFAULT;
 
@@ -126,16 +129,22 @@ public class DirectoryDeletingService extends BackgroundService {
         try {
           long startTime = Time.monotonicNow();
           // step-1) Get one pending deleted directory
-          OmKeyInfo pendingDeletedDirInfo =
+          Table.KeyValue<String, OmKeyInfo> pendingDeletedDirInfo =
               ozoneManager.getKeyManager().getPendingDeletionDir();
           if (pendingDeletedDirInfo != null) {
             if (LOG.isDebugEnabled()) {
               LOG.debug("Pending deleted dir name: {}",
-                  pendingDeletedDirInfo.getKeyName());
+                  pendingDeletedDirInfo.getValue().getKeyName());
             }
+            final String[] keys = pendingDeletedDirInfo.getKey()
+                    .split(OM_KEY_PREFIX);
+            final long volumeId = Long.parseLong(keys[1]);
+            final long bucketId = Long.parseLong(keys[2]);
+
             // step-1: get all sub directories under the deletedDir
             List<OmKeyInfo> dirs = ozoneManager.getKeyManager()
-                .getPendingDeletionSubDirs(pendingDeletedDirInfo, count);
+                .getPendingDeletionSubDirs(volumeId, bucketId,
+                        pendingDeletedDirInfo.getValue(), count);
             count = count - dirs.size();
             List<OmKeyInfo> deletedSubDirList = new ArrayList<>();
             for (OmKeyInfo dirInfo : dirs) {
@@ -148,7 +157,8 @@ public class DirectoryDeletingService extends BackgroundService {
 
             // step-2: get all sub files under the deletedDir
             List<OmKeyInfo> purgeDeletedFiles = ozoneManager.getKeyManager()
-                .getPendingDeletionSubFiles(pendingDeletedDirInfo, count);
+                .getPendingDeletionSubFiles(volumeId, bucketId,
+                        pendingDeletedDirInfo.getValue(), count);
             count = count - purgeDeletedFiles.size();
 
             if (LOG.isDebugEnabled()) {
@@ -162,21 +172,17 @@ public class DirectoryDeletingService extends BackgroundService {
             // limit. If count reached limit then there can be some more child
             // paths to be visited and will keep the parent deleted directory
             // for one more pass.
-            List<String> purgeDeletedDirs = new ArrayList<>();
-            if (count > 0) {
-              // TODO: Now, there is only one entry in this list. Maintained
-              //  list data structure becuase this can be extended to add
-              //  more directories within the batchSize limit.
-              purgeDeletedDirs.add(pendingDeletedDirInfo.getPath());
-            }
+            final Optional<String> purgeDeletedDir = count > 0 ?
+                    Optional.of(pendingDeletedDirInfo.getKey()) :
+                    Optional.empty();
 
             if (isRatisEnabled()) {
-              submitPurgePaths(purgeDeletedDirs, purgeDeletedFiles,
-                  deletedSubDirList);
+              submitPurgePaths(volumeId, bucketId, purgeDeletedDir,
+                      purgeDeletedFiles, deletedSubDirList);
             }
             // TODO: need to handle delete with non-ratis
 
-            deletedDirsCount.addAndGet(purgeDeletedDirs.size());
+            deletedDirsCount.incrementAndGet();
             deletedFilesCount.addAndGet(purgeDeletedFiles.size());
             if (LOG.isDebugEnabled()) {
               LOG.debug("Number of dirs deleted: {}, Number of files moved:" +
@@ -227,15 +233,17 @@ public class DirectoryDeletingService extends BackgroundService {
     return runCount.get();
   }
 
-  private int submitPurgePaths(List<String> purgeDeletedDirs,
-      List<OmKeyInfo> purgeDeletedFiles, List<OmKeyInfo> markDirsAsDeleted) {
+  private int submitPurgePaths(final long volumeId, final long bucketId,
+      final Optional<String> purgeDeletedDir,
+      final List<OmKeyInfo> purgeDeletedFiles,
+      final List<OmKeyInfo> markDirsAsDeleted) {
     // Put all keys to be purged in a list
     int deletedCount = 0;
-    OzoneManagerProtocolProtos.PurgePathsRequest.Builder purgePathsRequest =
-        OzoneManagerProtocolProtos.PurgePathsRequest.newBuilder();
-    for (String purgeDir : purgeDeletedDirs) {
-      purgePathsRequest.addDeletedDirs(purgeDir);
-    }
+    OzoneManagerProtocolProtos.PurgePathRequest.Builder purgePathsRequest =
+        OzoneManagerProtocolProtos.PurgePathRequest.newBuilder();
+    purgePathsRequest.setVolumeId(volumeId);
+    purgePathsRequest.setBucketId(bucketId);
+    purgeDeletedDir.ifPresent(purgePathsRequest::setDeletedDir);
     for (OmKeyInfo purgeFile : purgeDeletedFiles) {
       purgePathsRequest.addDeletedSubFiles(
           purgeFile.getProtobuf(true, ClientVersion.CURRENT_VERSION));
@@ -248,10 +256,15 @@ public class DirectoryDeletingService extends BackgroundService {
           dir.getProtobuf(ClientVersion.CURRENT_VERSION));
     }
 
+    OzoneManagerProtocolProtos.PurgeDirectoriesRequest.Builder purgeDirRequest =
+            OzoneManagerProtocolProtos.PurgeDirectoriesRequest.newBuilder();
+    purgeDirRequest.addDeletedPath(purgePathsRequest.build());
+
+
     OzoneManagerProtocolProtos.OMRequest omRequest =
         OzoneManagerProtocolProtos.OMRequest.newBuilder()
-            .setCmdType(OzoneManagerProtocolProtos.Type.PurgePaths)
-            .setPurgePathsRequest(purgePathsRequest)
+            .setCmdType(OzoneManagerProtocolProtos.Type.PurgeDirectories)
+            .setPurgeDirectoriesRequest(purgeDirRequest)
             .setClientId(clientId.toString())
             .build();
 
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManager.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManager.java
index 9e5ae1cc49..7bcb00fafb 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManager.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManager.java
@@ -17,6 +17,7 @@
 package org.apache.hadoop.ozone.om;
 
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.utils.db.Table;
 import org.apache.hadoop.ozone.common.BlockGroup;
 import org.apache.hadoop.ozone.om.exceptions.OMException;
 import org.apache.hadoop.ozone.om.helpers.BucketLayout;
@@ -194,7 +195,7 @@ public interface KeyManager extends OzoneManagerFS, IOzoneAcl {
    * @return OmKeyInfo
    * @throws IOException
    */
-  OmKeyInfo getPendingDeletionDir() throws IOException;
+  Table.KeyValue<String, OmKeyInfo> getPendingDeletionDir() throws IOException;
 
   /**
    * Returns all sub directories under the given parent directory.
@@ -204,8 +205,8 @@ public interface KeyManager extends OzoneManagerFS, IOzoneAcl {
    * @return list of dirs
    * @throws IOException
    */
-  List<OmKeyInfo> getPendingDeletionSubDirs(OmKeyInfo parentInfo,
-      long numEntries) throws IOException;
+  List<OmKeyInfo> getPendingDeletionSubDirs(long volumeId, long bucketId,
+      OmKeyInfo parentInfo, long numEntries) throws IOException;
 
   /**
    * Returns all sub files under the given parent directory.
@@ -215,8 +216,9 @@ public interface KeyManager extends OzoneManagerFS, IOzoneAcl {
    * @return list of files
    * @throws IOException
    */
-  List<OmKeyInfo> getPendingDeletionSubFiles(OmKeyInfo parentInfo,
-      long numEntries) throws IOException;
+  List<OmKeyInfo> getPendingDeletionSubFiles(long volumeId,
+      long bucketId, OmKeyInfo parentInfo, long numEntries)
+          throws IOException;
 
   /**
    * Returns the instance of Directory Deleting Service.
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
index c749e7bc62..35ebbd1589 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
@@ -803,13 +803,14 @@ public class KeyManagerImpl implements KeyManager {
     OMMetadataManager metaMgr = ozoneManager.getMetadataManager();
     String fileName = OzoneFSUtils.getFileName(keyName);
     Iterator<Path> pathComponents = Paths.get(keyName).iterator();
-    String bucketKey = metaMgr.getBucketKey(volumeName, bucketName);
-    OmBucketInfo omBucketInfo = metaMgr.getBucketTable().get(bucketKey);
-    long bucketId = omBucketInfo.getObjectID();
+    final long volumeId = metaMgr.getVolumeId(volumeName);
+    final long bucketId = metaMgr.getBucketId(volumeName, bucketName);
     long parentID =
-        OMFileRequest.getParentID(bucketId, pathComponents, keyName, metaMgr);
+        OMFileRequest.getParentID(volumeId, bucketId, pathComponents,
+                keyName, metaMgr);
 
-    String multipartKey = metaMgr.getMultipartKey(parentID, fileName, uploadID);
+    String multipartKey = metaMgr.getMultipartKey(volumeId, bucketId,
+            parentID, fileName, uploadID);
 
     return multipartKey;
   }
@@ -1689,6 +1690,8 @@ public class KeyManagerImpl implements KeyManager {
     final String volumeName = args.getVolumeName();
     final String bucketName = args.getBucketName();
     final String keyName = args.getKeyName();
+    final long volumeId = metadataManager.getVolumeId(volumeName);
+    final long bucketId = metadataManager.getBucketId(volumeName, bucketName);
     String seekFileInDB;
     String seekDirInDB;
     long prefixKeyInDB;
@@ -1724,13 +1727,12 @@ public class KeyManagerImpl implements KeyManager {
         prefixKeyInDB = fileStatus.getKeyInfo().getObjectID();
       } else {
         // list root directory.
-        String bucketKey = metadataManager.getBucketKey(volumeName, bucketName);
-        OmBucketInfo omBucketInfo =
-            metadataManager.getBucketTable().get(bucketKey);
-        prefixKeyInDB = omBucketInfo.getObjectID();
+        prefixKeyInDB = bucketId;
       }
-      seekFileInDB = metadataManager.getOzonePathKey(prefixKeyInDB, "");
-      seekDirInDB = metadataManager.getOzonePathKey(prefixKeyInDB, "");
+      seekFileInDB = metadataManager.getOzonePathKey(
+              volumeId, bucketId, prefixKeyInDB, "");
+      seekDirInDB = metadataManager.getOzonePathKey(
+              volumeId, bucketId, prefixKeyInDB, "");
 
       // Order of seek ->
       // (1)Seek files in fileTable
@@ -1800,7 +1802,8 @@ public class KeyManagerImpl implements KeyManager {
         prefixKeyInDB = fileStatusInfo.getKeyInfo().getParentObjectID();
 
         if (fileStatusInfo.isDirectory()) {
-          seekDirInDB = metadataManager.getOzonePathKey(prefixKeyInDB,
+          seekDirInDB = metadataManager.getOzonePathKey(
+                  volumeId, bucketId, prefixKeyInDB,
               fileStatusInfo.getKeyInfo().getFileName());
 
           // Order of seek -> (1) Seek dirs only in dirTable. In OM, always
@@ -1821,10 +1824,12 @@ public class KeyManagerImpl implements KeyManager {
           }
 
         } else {
-          seekFileInDB = metadataManager.getOzonePathKey(prefixKeyInDB,
+          seekFileInDB = metadataManager.getOzonePathKey(
+                  volumeId, bucketId, prefixKeyInDB,
               fileStatusInfo.getKeyInfo().getFileName());
           // begins from the first sub-dir under the parent dir
-          seekDirInDB = metadataManager.getOzonePathKey(prefixKeyInDB, "");
+          seekDirInDB = metadataManager.getOzonePathKey(
+                  volumeId, bucketId, prefixKeyInDB, "");
 
           // First under lock obtain both entries from dir/file cache and
           // generate entries marked for delete.
@@ -2032,9 +2037,18 @@ public class KeyManagerImpl implements KeyManager {
     while (iterator.hasNext() && numEntries - countEntries > 0) {
       Table.KeyValue<String, OmKeyInfo> entry = iterator.next();
       OmKeyInfo keyInfo = entry.getValue();
-      if (deletedKeySet.contains(keyInfo.getPath())) {
-        iterator.next(); // move to next entry in the table
-        // entry is actually deleted in cache and can exists in DB
+      if (!entry.getKey().startsWith(seekKeyInDB)) {
+        break;
+      }
+      final long volumeId = metadataManager.getVolumeId(
+              keyInfo.getVolumeName());
+      final long bucketId = metadataManager.getBucketId(
+              keyInfo.getVolumeName(), keyInfo.getBucketName());
+      final String keyPath = metadataManager.getOzonePathKey(volumeId,
+              bucketId, keyInfo.getParentObjectID(), keyInfo.getFileName());
+      if (deletedKeySet.contains(keyPath)) {
+        // move to next entry in the table
+        // entry is actually deleted in cache and can exist in DB
         continue;
       }
       if (!OMFileRequest.isImmediateChild(keyInfo.getParentObjectID(),
@@ -2340,25 +2354,27 @@ public class KeyManagerImpl implements KeyManager {
   }
 
   @Override
-  public OmKeyInfo getPendingDeletionDir() throws IOException {
-    OmKeyInfo omKeyInfo = null;
+  public Table.KeyValue<String, OmKeyInfo> getPendingDeletionDir()
+          throws IOException {
+    // TODO: Make the return type as OmDirectoryInfo after adding
+    //  volumeId and bucketId to OmDirectoryInfo
     try (TableIterator<String, ? extends Table.KeyValue<String, OmKeyInfo>>
              deletedDirItr = metadataManager.getDeletedDirTable().iterator()) {
       if (deletedDirItr.hasNext()) {
         Table.KeyValue<String, OmKeyInfo> keyValue = deletedDirItr.next();
         if (keyValue != null) {
-          omKeyInfo = keyValue.getValue();
+          return keyValue;
         }
       }
     }
-    return omKeyInfo;
+    return null;
   }
 
   @Override
-  public List<OmKeyInfo> getPendingDeletionSubDirs(OmKeyInfo parentInfo,
-      long numEntries) throws IOException {
+  public List<OmKeyInfo> getPendingDeletionSubDirs(long volumeId, long bucketId,
+      OmKeyInfo parentInfo, long numEntries) throws IOException {
     List<OmKeyInfo> directories = new ArrayList<>();
-    String seekDirInDB = metadataManager.getOzonePathKey(
+    String seekDirInDB = metadataManager.getOzonePathKey(volumeId, bucketId,
         parentInfo.getObjectID(), "");
     long countEntries = 0;
 
@@ -2388,10 +2404,11 @@ public class KeyManagerImpl implements KeyManager {
   }
 
   @Override
-  public List<OmKeyInfo> getPendingDeletionSubFiles(OmKeyInfo parentInfo,
-      long numEntries) throws IOException {
+  public List<OmKeyInfo> getPendingDeletionSubFiles(long volumeId,
+      long bucketId, OmKeyInfo parentInfo, long numEntries)
+          throws IOException {
     List<OmKeyInfo> files = new ArrayList<>();
-    String seekFileInDB = metadataManager.getOzonePathKey(
+    String seekFileInDB = metadataManager.getOzonePathKey(volumeId, bucketId,
         parentInfo.getObjectID(), "");
     long countEntries = 0;
 
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java
index 5bb06168ef..ed46c7f82e 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java
@@ -157,15 +157,15 @@ public class OmMetadataManagerImpl implements OMMetadataManager {
    *
    * Prefix Tables:
    * |----------------------------------------------------------------------|
-   * |  Column Family     |        VALUE                                    |
+   * |  Column Family   |        VALUE                                      |
    * |----------------------------------------------------------------------|
-   * |  directoryTable    | parentId/directoryName -> DirectoryInfo         |
+   * |  directoryTable  | /volumeId/bucketId/parentId/dirName -> DirInfo    |
    * |----------------------------------------------------------------------|
-   * |  fileTable         | parentId/fileName -> KeyInfo                    |
+   * |  fileTable       | /volumeId/bucketId/parentId/fileName -> KeyInfo   |
    * |----------------------------------------------------------------------|
-   * |  openFileTable     | parentId/fileName/id -> KeyInfo                 |
+   * |  openFileTable   | /volumeId/bucketId/parentId/fileName/id -> KeyInfo|
    * |----------------------------------------------------------------------|
-   * |  deletedDirTable   | parentId/directoryName -> KeyInfo               |
+   * |  deletedDirTable | /volumeId/bucketId/parentId/dirName -> KeyInfo    |
    * |----------------------------------------------------------------------|
    */
 
@@ -765,6 +765,8 @@ public class OmMetadataManagerImpl implements OMMetadataManager {
       throws IOException {
     String bucketKey = getBucketKey(volume, bucket);
     OmBucketInfo omBucketInfo = getBucketTable().get(bucketKey);
+    String volumeId = String.valueOf(getVolumeId(
+            omBucketInfo.getVolumeName()));
     String bucketId = String.valueOf(omBucketInfo.getObjectID());
     BucketLayout bucketLayout = omBucketInfo.getBucketLayout();
 
@@ -776,10 +778,13 @@ public class OmMetadataManagerImpl implements OMMetadataManager {
     //    - TOP-LEVEL DIRECTORY would be of the format <bucket ID>/dirName
     //      inside the dirTable.
     //    - TOP-LEVEL FILE (a file directly placed under the bucket without
-    //      any sub paths) would be of the format <bucket ID>/fileName inside
-    //      the fileTable.
+    //      any sub paths) would be of the format
+    //      /<volume ID>/<bucket ID>/fileName inside the fileTable.
     String keyPrefix =
-        bucketLayout.isFileSystemOptimized() ? bucketId : bucketKey;
+        bucketLayout.isFileSystemOptimized() ?
+                OM_KEY_PREFIX + volumeId + OM_KEY_PREFIX +
+                        bucketId + OM_KEY_PREFIX + bucketId + OM_KEY_PREFIX :
+                bucketKey;
 
     // Check key/file Table
     Table<String, OmKeyInfo> table = getKeyTable(bucketLayout);
@@ -1422,28 +1427,42 @@ public class OmMetadataManagerImpl implements OMMetadataManager {
   }
 
   @Override
-  public String getOzonePathKey(long parentObjectId, String pathComponentName) {
-    StringBuilder builder = new StringBuilder();
-    builder.append(parentObjectId);
-    builder.append(OM_KEY_PREFIX).append(pathComponentName);
+  public String getOzonePathKey(final long volumeId, final long bucketId,
+                                final long parentObjectId,
+                                final String pathComponentName) {
+    final StringBuilder builder = new StringBuilder();
+    builder.append(OM_KEY_PREFIX)
+            .append(volumeId)
+            .append(OM_KEY_PREFIX)
+            .append(bucketId)
+            .append(OM_KEY_PREFIX)
+            .append(parentObjectId)
+            .append(OM_KEY_PREFIX)
+            .append(pathComponentName);
     return builder.toString();
   }
 
   @Override
-  public String getOpenFileName(long parentID, String fileName,
+  public String getOpenFileName(long volumeId, long bucketId,
+                                long parentID, String fileName,
                                 long id) {
     StringBuilder openKey = new StringBuilder();
-    openKey.append(parentID);
+    openKey.append(OM_KEY_PREFIX).append(volumeId);
+    openKey.append(OM_KEY_PREFIX).append(bucketId);
+    openKey.append(OM_KEY_PREFIX).append(parentID);
     openKey.append(OM_KEY_PREFIX).append(fileName);
     openKey.append(OM_KEY_PREFIX).append(id);
     return openKey.toString();
   }
 
   @Override
-  public String getMultipartKey(long parentID, String fileName,
+  public String getMultipartKey(long volumeId, long bucketId,
+                                long parentID, String fileName,
                                 String uploadId) {
     StringBuilder openKey = new StringBuilder();
-    openKey.append(parentID);
+    openKey.append(OM_KEY_PREFIX).append(volumeId);
+    openKey.append(OM_KEY_PREFIX).append(bucketId);
+    openKey.append(OM_KEY_PREFIX).append(parentID);
     openKey.append(OM_KEY_PREFIX).append(fileName);
     openKey.append(OM_KEY_PREFIX).append(uploadId);
     return openKey.toString();
@@ -1452,4 +1471,14 @@ public class OmMetadataManagerImpl implements OMMetadataManager {
   public BucketLayout getBucketLayout() {
     return BucketLayout.DEFAULT;
   }
+
+  @Override
+  public long getVolumeId(String volume) throws IOException {
+    return getVolumeTable().get(getVolumeKey(volume)).getObjectID();
+  }
+
+  @Override
+  public long getBucketId(String volume, String bucket) throws IOException {
+    return getBucketTable().get(getBucketKey(volume, bucket)).getObjectID();
+  }
 }
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneListStatusHelper.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneListStatusHelper.java
index a6dbcd0428..8d35027b21 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneListStatusHelper.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneListStatusHelper.java
@@ -23,6 +23,7 @@ import org.apache.hadoop.hdds.utils.db.Table;
 import org.apache.hadoop.hdds.utils.db.TableIterator;
 import org.apache.hadoop.hdds.utils.db.cache.CacheKey;
 import org.apache.hadoop.hdds.utils.db.cache.CacheValue;
+import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
 import org.apache.hadoop.ozone.om.helpers.OzoneFileStatus;
 import org.apache.hadoop.ozone.om.helpers.OzoneFSUtils;
 import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
@@ -95,13 +96,20 @@ public class OzoneListStatusHelper {
     String keyName = args.getKeyName();
     String prefixKey = keyName;
 
-    String bucketKey = metadataManager.getBucketKey(volumeName, bucketName);
-    OmBucketInfo omBucketInfo =
-        metadataManager.getBucketTable().get(bucketKey);
-    if (omBucketInfo == null) {
+    final String volumeKey = metadataManager.getVolumeKey(volumeName);
+    final String bucketKey = metadataManager.getBucketKey(volumeName,
+            bucketName);
+
+    final OmVolumeArgs volumeInfo = metadataManager.getVolumeTable()
+            .get(volumeKey);
+    final OmBucketInfo omBucketInfo = metadataManager.getBucketTable()
+            .get(bucketKey);
+
+    if (volumeInfo == null || omBucketInfo == null) {
       if (LOG.isDebugEnabled()) {
-        LOG.debug("Volume:{} Bucket:{} does not exist",
-            volumeName, bucketName);
+        LOG.debug(String.format("%s does not exist.", (volumeInfo == null) ?
+                "Volume : " + volumeName :
+                "Bucket: " + volumeName + "/" + bucketName));
       }
       return new ArrayList<>();
     }
@@ -136,7 +144,7 @@ public class OzoneListStatusHelper {
       // if the file status is null, prefix is a not a valid filesystem path
       // this should only work in list keys mode.
       // fetch the db key based on the prefix path.
-      dbPrefixKey = getDbKey(keyName, args, omBucketInfo);
+      dbPrefixKey = getDbKey(keyName, args, volumeInfo, omBucketInfo);
       prefixKey = OzoneFSUtils.getParentDir(keyName);
     } else {
       // If the keyname is a file just return one entry
@@ -146,13 +154,16 @@ public class OzoneListStatusHelper {
 
       // fetch the db key based on parent prefix id.
       long id = getId(fileStatus, omBucketInfo);
-      dbPrefixKey = metadataManager.getOzonePathKey(id, "");
+      final long volumeId = volumeInfo.getObjectID();
+      final long bucketId = omBucketInfo.getObjectID();
+      dbPrefixKey = metadataManager.getOzonePathKey(volumeId, bucketId,
+              id, "");
     }
 
     // Determine startKeyPrefix for DB iteration
     String startKeyPrefix =
         Strings.isNullOrEmpty(startKey) ? "" :
-            getDbKey(startKey, args, omBucketInfo);
+            getDbKey(startKey, args, volumeInfo, omBucketInfo);
 
     TreeMap<String, OzoneFileStatus> map = new TreeMap<>();
 
@@ -175,6 +186,7 @@ public class OzoneListStatusHelper {
   }
 
   private String getDbKey(String key, OmKeyArgs args,
+                          OmVolumeArgs volumeInfo,
                           OmBucketInfo omBucketInfo) throws IOException {
     long startKeyParentId;
     String parent = OzoneFSUtils.getParentDir(key);
@@ -189,8 +201,11 @@ public class OzoneListStatusHelper {
         null, true);
     Preconditions.checkNotNull(fileStatusInfo);
     startKeyParentId = getId(fileStatusInfo, omBucketInfo);
+    final long volumeId = volumeInfo.getObjectID();
+    final long bucketId = omBucketInfo.getObjectID();
     return metadataManager.
-        getOzonePathKey(startKeyParentId, OzoneFSUtils.getFileName(key));
+        getOzonePathKey(volumeId, bucketId, startKeyParentId,
+                OzoneFSUtils.getFileName(key));
   }
 
   private long getId(OzoneFileStatus fileStatus, OmBucketInfo omBucketInfo) {
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/utils/OzoneManagerRatisUtils.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/utils/OzoneManagerRatisUtils.java
index 5915992943..86254064c3 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/utils/OzoneManagerRatisUtils.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/utils/OzoneManagerRatisUtils.java
@@ -46,7 +46,7 @@ import org.apache.hadoop.ozone.om.request.bucket.acl.OMBucketAddAclRequest;
 import org.apache.hadoop.ozone.om.request.bucket.acl.OMBucketRemoveAclRequest;
 import org.apache.hadoop.ozone.om.request.bucket.acl.OMBucketSetAclRequest;
 import org.apache.hadoop.ozone.om.request.key.OMKeyPurgeRequest;
-import org.apache.hadoop.ozone.om.request.key.OMPathsPurgeRequestWithFSO;
+import org.apache.hadoop.ozone.om.request.key.OMDirectoriesPurgeRequestWithFSO;
 import org.apache.hadoop.ozone.om.request.key.OMTrashRecoverRequest;
 import org.apache.hadoop.ozone.om.request.key.acl.OMKeyAddAclRequest;
 import org.apache.hadoop.ozone.om.request.key.acl.OMKeyAddAclRequestWithFSO;
@@ -188,8 +188,8 @@ public final class OzoneManagerRatisUtils {
       return new S3RevokeSecretRequest(omRequest);
     case PurgeKeys:
       return new OMKeyPurgeRequest(omRequest);
-    case PurgePaths:
-      return new OMPathsPurgeRequestWithFSO(omRequest);
+    case PurgeDirectories:
+      return new OMDirectoriesPurgeRequestWithFSO(omRequest);
     case CreateTenant:
       ozoneManager.checkS3MultiTenancyEnabled();
       return new OMTenantCreateRequest(omRequest);
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/file/OMDirectoryCreateRequestWithFSO.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/file/OMDirectoryCreateRequestWithFSO.java
index 2b65cdf1c5..f7fba71b84 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/file/OMDirectoryCreateRequestWithFSO.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/file/OMDirectoryCreateRequestWithFSO.java
@@ -156,6 +156,10 @@ public class OMDirectoryCreateRequestWithFSO extends OMDirectoryCreateRequest {
                 OMDirectoryCreateRequestWithFSO.getAllMissingParentDirInfo(
                         ozoneManager, keyArgs, omPathInfo, trxnLogIndex);
 
+        final long volumeId = omMetadataManager.getVolumeId(volumeName);
+        final long bucketId = omMetadataManager
+                .getBucketId(volumeName, bucketName);
+
         // prepare leafNode dir
         OmDirectoryInfo dirInfo = createDirectoryInfoWithACL(
                 omPathInfo.getLeafNodeName(),
@@ -163,15 +167,16 @@ public class OMDirectoryCreateRequestWithFSO extends OMDirectoryCreateRequest {
                 omPathInfo.getLastKnownParentId(), trxnLogIndex,
                 OzoneAclUtil.fromProtobuf(keyArgs.getAclsList()));
         OMFileRequest.addDirectoryTableCacheEntries(omMetadataManager,
-                Optional.of(dirInfo), Optional.of(missingParentInfos),
-                trxnLogIndex);
+                volumeId, bucketId, trxnLogIndex,
+                Optional.of(missingParentInfos), Optional.of(dirInfo));
 
         // total number of keys created.
         numKeysCreated = missingParentInfos.size() + 1;
 
         result = OMDirectoryCreateRequest.Result.SUCCESS;
         omClientResponse =
-            new OMDirectoryCreateResponseWithFSO(omResponse.build(), dirInfo,
+            new OMDirectoryCreateResponseWithFSO(omResponse.build(),
+                    volumeName, bucketName, dirInfo,
                 missingParentInfos, result, getBucketLayout());
       } else {
         result = Result.DIRECTORY_ALREADY_EXISTS;
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/file/OMFileCreateRequestWithFSO.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/file/OMFileCreateRequestWithFSO.java
index 77d97aa1d4..d1f60cfd67 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/file/OMFileCreateRequestWithFSO.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/file/OMFileCreateRequestWithFSO.java
@@ -130,6 +130,10 @@ public class OMFileCreateRequestWithFSO extends OMFileCreateRequest {
 
       validateBucketAndVolume(omMetadataManager, volumeName, bucketName);
 
+      final long volumeId = omMetadataManager.getVolumeId(volumeName);
+      final long bucketId = omMetadataManager
+              .getBucketId(volumeName, bucketName);
+
       OmKeyInfo dbFileInfo = null;
 
       OMFileRequest.OMPathInfoWithFSO pathInfoFSO =
@@ -138,7 +142,7 @@ public class OMFileCreateRequestWithFSO extends OMFileCreateRequest {
 
       if (pathInfoFSO.getDirectoryResult()
               == OMFileRequest.OMDirectoryResult.FILE_EXISTS) {
-        String dbFileKey = omMetadataManager.getOzonePathKey(
+        String dbFileKey = omMetadataManager.getOzonePathKey(volumeId, bucketId,
                 pathInfoFSO.getLastKnownParentId(),
                 pathInfoFSO.getLeafNodeName());
         dbFileInfo = OMFileRequest.getOmKeyInfoFromFileTable(false,
@@ -180,8 +184,9 @@ public class OMFileCreateRequestWithFSO extends OMFileCreateRequest {
       long openVersion = omFileInfo.getLatestVersionLocations().getVersion();
       long clientID = createFileRequest.getClientID();
       String dbOpenFileName = omMetadataManager
-          .getOpenFileName(pathInfoFSO.getLastKnownParentId(),
-              pathInfoFSO.getLeafNodeName(), clientID);
+          .getOpenFileName(volumeId, bucketId,
+                  pathInfoFSO.getLastKnownParentId(),
+                  pathInfoFSO.getLeafNodeName(), clientID);
 
       // Append new blocks
       List<OmKeyLocationInfo> newLocationList = keyArgs.getKeyLocationsList()
@@ -206,9 +211,9 @@ public class OMFileCreateRequestWithFSO extends OMFileCreateRequest {
 
       // Add cache entries for the prefix directories.
       // Skip adding for the file key itself, until Key Commit.
-      OMFileRequest.addDirectoryTableCacheEntries(omMetadataManager,
-              Optional.absent(), Optional.of(missingParentInfos),
-              trxnLogIndex);
+      OMFileRequest.addDirectoryTableCacheEntries(omMetadataManager, volumeId,
+              bucketId, trxnLogIndex, Optional.of(missingParentInfos),
+              Optional.absent());
 
       // Prepare response. Sets user given full key name in the 'keyName'
       // attribute in response object.
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/file/OMFileRequest.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/file/OMFileRequest.java
index b5fe2bc98a..8363be8d9b 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/file/OMFileRequest.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/file/OMFileRequest.java
@@ -40,7 +40,6 @@ import org.apache.hadoop.hdds.utils.db.cache.CacheValue;
 import org.apache.hadoop.ozone.OzoneAcl;
 import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.ozone.om.OMMetadataManager;
-import static org.apache.hadoop.ozone.om.OzoneManagerUtils.getBucketLayout;
 import org.apache.hadoop.ozone.om.exceptions.OMException;
 import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
 import org.apache.hadoop.ozone.om.helpers.OmDirectoryInfo;
@@ -57,6 +56,8 @@ import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nonnull;
 
+import static org.apache.hadoop.ozone.om.OzoneManagerUtils.getBucketLayout;
+
 import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.BUCKET_NOT_FOUND;
 import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.NOT_A_FILE;
 import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.VOLUME_NOT_FOUND;
@@ -187,6 +188,11 @@ public final class OMFileRequest {
     String bucketKey = omMetadataManager.getBucketKey(volumeName, bucketName);
     OmBucketInfo omBucketInfo =
             omMetadataManager.getBucketTable().get(bucketKey);
+
+    final long volumeId = omMetadataManager.getVolumeId(volumeName);
+    final long bucketId = omMetadataManager.getBucketId(volumeName,
+            bucketName);
+
     // by default, inherit bucket ACLs
     List<OzoneAcl> inheritAcls = omBucketInfo.getAcls();
 
@@ -213,7 +219,7 @@ public final class OMFileRequest {
       // 1. Do lookup on directoryTable. If not exists goto next step.
       // 2. Do look on keyTable. If not exists goto next step.
       // 3. Add 'sub-dir' to missing parents list
-      String dbNodeName = omMetadataManager.getOzonePathKey(
+      String dbNodeName = omMetadataManager.getOzonePathKey(volumeId, bucketId,
               lastKnownParentId, fileName);
       OmDirectoryInfo omDirInfo = omMetadataManager.getDirectoryTable().
               get(dbNodeName);
@@ -428,26 +434,30 @@ public final class OMFileRequest {
    * Adding directory info to the Table cache.
    *
    * @param omMetadataManager  OM Metadata Manager
-   * @param dirInfo            directory info
-   * @param missingParentInfos list of the parents to be added to DB
+   * @param volumeId           ID of the Volume
+   * @param bucketId           ID of the Bucket
    * @param trxnLogIndex       transaction log index
+   * @param missingParentInfos list of the parents to be added to DB
+   * @param dirInfo            directory info
    */
   public static void addDirectoryTableCacheEntries(
           OMMetadataManager omMetadataManager,
-          Optional<OmDirectoryInfo> dirInfo,
+          long volumeId, long bucketId, long trxnLogIndex,
           Optional<List<OmDirectoryInfo>> missingParentInfos,
-          long trxnLogIndex) {
+          Optional<OmDirectoryInfo> dirInfo) {
+
     for (OmDirectoryInfo subDirInfo : missingParentInfos.get()) {
       omMetadataManager.getDirectoryTable().addCacheEntry(
               new CacheKey<>(omMetadataManager.getOzonePathKey(
-                      subDirInfo.getParentObjectID(), subDirInfo.getName())),
+                      volumeId, bucketId, subDirInfo.getParentObjectID(),
+                      subDirInfo.getName())),
               new CacheValue<>(Optional.of(subDirInfo), trxnLogIndex));
     }
 
     if (dirInfo.isPresent()) {
       omMetadataManager.getDirectoryTable().addCacheEntry(
               new CacheKey<>(omMetadataManager.getOzonePathKey(
-                      dirInfo.get().getParentObjectID(),
+                      volumeId, bucketId, dirInfo.get().getParentObjectID(),
                       dirInfo.get().getName())),
               new CacheValue<>(dirInfo, trxnLogIndex));
     }
@@ -541,7 +551,11 @@ public final class OMFileRequest {
                                         long openKeySessionID)
           throws IOException {
 
-    String dbOpenFileKey = omMetadataMgr.getOpenFileName(
+    final long volumeId = omMetadataMgr.getVolumeId(
+            omFileInfo.getVolumeName());
+    final long bucketId = omMetadataMgr.getBucketId(
+            omFileInfo.getVolumeName(), omFileInfo.getBucketName());
+    String dbOpenFileKey = omMetadataMgr.getOpenFileName(volumeId, bucketId,
             omFileInfo.getParentObjectID(), omFileInfo.getFileName(),
             openKeySessionID);
 
@@ -563,9 +577,14 @@ public final class OMFileRequest {
       BatchOperation batchOp, OmKeyInfo omFileInfo, String uploadID)
           throws IOException {
 
-    String multipartFileKey = omMetadataMgr.getMultipartKey(
-            omFileInfo.getParentObjectID(), omFileInfo.getFileName(),
-            uploadID);
+    final long volumeId = omMetadataMgr.getVolumeId(
+            omFileInfo.getVolumeName());
+    final long bucketId = omMetadataMgr.getBucketId(
+            omFileInfo.getVolumeName(), omFileInfo.getBucketName());
+
+    String multipartFileKey = omMetadataMgr.getMultipartKey(volumeId,
+            bucketId, omFileInfo.getParentObjectID(),
+            omFileInfo.getFileName(), uploadID);
 
     omMetadataMgr.getOpenKeyTable(BucketLayout.FILE_SYSTEM_OPTIMIZED)
         .putWithBatch(batchOp, multipartFileKey, omFileInfo);
@@ -586,8 +605,11 @@ public final class OMFileRequest {
                                     BatchOperation batchOp,
                                     OmKeyInfo omFileInfo)
           throws IOException {
-
-    String dbFileKey = omMetadataMgr.getOzonePathKey(
+    final long volumeId = omMetadataMgr.getVolumeId(
+            omFileInfo.getVolumeName());
+    final long bucketId = omMetadataMgr.getBucketId(
+            omFileInfo.getVolumeName(), omFileInfo.getBucketName());
+    String dbFileKey = omMetadataMgr.getOzonePathKey(volumeId, bucketId,
             omFileInfo.getParentObjectID(), omFileInfo.getFileName());
 
     omMetadataMgr.getKeyTable(BucketLayout.FILE_SYSTEM_OPTIMIZED)
@@ -652,6 +674,7 @@ public final class OMFileRequest {
 
     Path keyPath = Paths.get(keyName);
     Iterator<Path> elements = keyPath.iterator();
+    final long volumeId = omMetadataMgr.getVolumeId(volumeName);
     String bucketKey = omMetadataMgr.getBucketKey(volumeName, bucketName);
     OmBucketInfo omBucketInfo =
             omMetadataMgr.getBucketTable().get(bucketKey);
@@ -667,6 +690,7 @@ public final class OMFileRequest {
       // 2. If there is no dir exists for the leaf node component 'file1.txt'
       // then do look it on fileTable.
       String dbNodeName = omMetadataMgr.getOzonePathKey(
+              volumeId, omBucketInfo.getObjectID(),
               lastKnownParentId, fileName);
       omDirInfo = omMetadataMgr.getDirectoryTable().get(dbNodeName);
 
@@ -872,8 +896,12 @@ public final class OMFileRequest {
       }
     }
 
+    final long volumeId = metaMgr.getVolumeId(omKeyInfo.getVolumeName());
+    final long bucketId = metaMgr.getBucketId(omKeyInfo.getVolumeName(),
+            omKeyInfo.getBucketName());
     // Check dirTable entries for any sub paths.
-    String seekDirInDB = metaMgr.getOzonePathKey(omKeyInfo.getObjectID(), "");
+    String seekDirInDB = metaMgr.getOzonePathKey(volumeId, bucketId,
+            omKeyInfo.getObjectID(), "");
     TableIterator<String, ? extends Table.KeyValue<String, OmDirectoryInfo>>
             iterator = dirTable.iterator();
 
@@ -912,8 +940,11 @@ public final class OMFileRequest {
       }
     }
 
+    final long volumeId = metaMgr.getVolumeId(omKeyInfo.getVolumeName());
+    final long bucketId = metaMgr.getBucketId(omKeyInfo.getVolumeName(),
+            omKeyInfo.getBucketName());
     // Check fileTable entries for any sub paths.
-    String seekFileInDB = metaMgr.getOzonePathKey(
+    String seekFileInDB = metaMgr.getOzonePathKey(volumeId, bucketId,
             omKeyInfo.getObjectID(), "");
     TableIterator<String, ? extends Table.KeyValue<String, OmKeyInfo>>
             iterator = fileTable.iterator();
@@ -936,6 +967,7 @@ public final class OMFileRequest {
   /**
    * Get parent id for the user given path.
    *
+   * @param volumeId       volume id
    * @param bucketId       bucket id
    * @param pathComponents fie path elements
    * @param keyName        user given key name
@@ -943,13 +975,14 @@ public final class OMFileRequest {
    * @return lastKnownParentID
    * @throws IOException DB failure or parent not exists in DirectoryTable
    */
-  public static long getParentID(long bucketId, Iterator<Path> pathComponents,
+  public static long getParentID(long volumeId, long bucketId,
+                                 Iterator<Path> pathComponents,
                                  String keyName,
                                  OMMetadataManager omMetadataManager)
       throws IOException {
 
-    return getParentID(bucketId, pathComponents, keyName, omMetadataManager,
-        null);
+    return getParentID(volumeId, bucketId, pathComponents, keyName,
+            omMetadataManager, null);
   }
 
   /**
@@ -962,8 +995,9 @@ public final class OMFileRequest {
    * @return lastKnownParentID
    * @throws IOException DB failure or parent not exists in DirectoryTable
    */
-  public static long getParentID(long bucketId, Iterator<Path> pathComponents,
-      String keyName, OMMetadataManager omMetadataManager, String errMsg)
+  public static long getParentID(long volumeId, long bucketId,
+      Iterator<Path> pathComponents, String keyName,
+      OMMetadataManager omMetadataManager, String errMsg)
       throws IOException {
 
     long lastKnownParentId = bucketId;
@@ -980,7 +1014,9 @@ public final class OMFileRequest {
       String nodeName = pathComponents.next().toString();
       boolean reachedLastPathComponent = !pathComponents.hasNext();
       String dbNodeName =
-              omMetadataManager.getOzonePathKey(lastKnownParentId, nodeName);
+              omMetadataManager.getOzonePathKey(volumeId, bucketId,
+                      lastKnownParentId, nodeName);
+
 
       omDirectoryInfo = omMetadataManager.
               getDirectoryTable().get(dbNodeName);
@@ -1020,14 +1056,13 @@ public final class OMFileRequest {
                                  String keyName)
       throws IOException {
 
-    String bucketKey = omMetadataManager.getBucketKey(volumeName, bucketName);
-    OmBucketInfo omBucketInfo =
-        omMetadataManager.getBucketTable().get(bucketKey);
 
-    long bucketId = omBucketInfo.getObjectID();
+    final long volumeId = omMetadataManager.getVolumeId(volumeName);
+    final long bucketId = omMetadataManager.getBucketId(volumeName,
+            bucketName);
     Iterator<Path> pathComponents = Paths.get(keyName).iterator();
-    return OMFileRequest
-        .getParentID(bucketId, pathComponents, keyName, omMetadataManager);
+    return OMFileRequest.getParentID(volumeId, bucketId,
+            pathComponents, keyName, omMetadataManager);
   }
 
   /**
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMAllocateBlockRequestWithFSO.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMAllocateBlockRequestWithFSO.java
index 61187c8477..c4265a4679 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMAllocateBlockRequestWithFSO.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMAllocateBlockRequestWithFSO.java
@@ -205,16 +205,15 @@ public class OMAllocateBlockRequestWithFSO extends OMAllocateBlockRequest {
       String keyName, long clientID, OzoneManager ozoneManager)
           throws IOException {
     OMMetadataManager omMetadataManager = ozoneManager.getMetadataManager();
-    String bucketKey = omMetadataManager.getBucketKey(volumeName, bucketName);
-    OmBucketInfo omBucketInfo =
-            omMetadataManager.getBucketTable().get(bucketKey);
-    long bucketId = omBucketInfo.getObjectID();
+    final long volumeId = omMetadataManager.getVolumeId(volumeName);
+    final long bucketId = omMetadataManager.getBucketId(
+            volumeName, bucketName);
     String fileName = OzoneFSUtils.getFileName(keyName);
     Iterator<Path> pathComponents = Paths.get(keyName).iterator();
-    long parentID = OMFileRequest.getParentID(bucketId, pathComponents,
-            keyName, omMetadataManager);
-    return omMetadataManager.getOpenFileName(parentID, fileName,
-            clientID);
+    long parentID = OMFileRequest.getParentID(volumeId, bucketId,
+            pathComponents, keyName, omMetadataManager);
+    return omMetadataManager.getOpenFileName(volumeId, bucketId, parentID,
+            fileName, clientID);
   }
 
   private void addOpenTableCacheEntry(long trxnLogIndex,
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMPathsPurgeRequestWithFSO.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMDirectoriesPurgeRequestWithFSO.java
similarity index 69%
rename from hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMPathsPurgeRequestWithFSO.java
rename to hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMDirectoriesPurgeRequestWithFSO.java
index e1cd0f0122..0f96e097f1 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMPathsPurgeRequestWithFSO.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMDirectoriesPurgeRequestWithFSO.java
@@ -23,7 +23,7 @@ import org.apache.hadoop.ozone.om.helpers.BucketLayout;
 import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerDoubleBufferHelper;
 import org.apache.hadoop.ozone.om.request.util.OmResponseUtil;
 import org.apache.hadoop.ozone.om.response.OMClientResponse;
-import org.apache.hadoop.ozone.om.response.key.OMPathsPurgeResponseWithFSO;
+import org.apache.hadoop.ozone.om.response.key.OMDirectoriesPurgeResponseWithFSO;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse;
@@ -33,30 +33,27 @@ import java.util.List;
 /**
  * Handles purging of keys from OM DB.
  */
-public class OMPathsPurgeRequestWithFSO extends OMKeyRequest {
+public class OMDirectoriesPurgeRequestWithFSO extends OMKeyRequest {
 
-  public OMPathsPurgeRequestWithFSO(OMRequest omRequest) {
+  public OMDirectoriesPurgeRequestWithFSO(OMRequest omRequest) {
     super(omRequest, BucketLayout.FILE_SYSTEM_OPTIMIZED);
   }
 
   @Override
   public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager,
       long trxnLogIndex, OzoneManagerDoubleBufferHelper omDoubleBufferHelper) {
-    OzoneManagerProtocolProtos.PurgePathsRequest purgePathsRequest =
-        getOmRequest().getPurgePathsRequest();
+    OzoneManagerProtocolProtos.PurgeDirectoriesRequest purgeDirsRequest =
+        getOmRequest().getPurgeDirectoriesRequest();
 
-    List<String> deletedDirsList = purgePathsRequest.getDeletedDirsList();
-    List<OzoneManagerProtocolProtos.KeyInfo> deletedSubFilesList =
-        purgePathsRequest.getDeletedSubFilesList();
-    List<OzoneManagerProtocolProtos.KeyInfo> markDeletedSubDirsList =
-        purgePathsRequest.getMarkDeletedSubDirsList();
+    List<OzoneManagerProtocolProtos.PurgePathRequest> purgeRequests =
+            purgeDirsRequest.getDeletedPathList();
 
     OMResponse.Builder omResponse = OmResponseUtil.getOMResponseBuilder(
         getOmRequest());
 
-    OMClientResponse omClientResponse = new OMPathsPurgeResponseWithFSO(
-        omResponse.build(), markDeletedSubDirsList, deletedSubFilesList,
-        deletedDirsList, ozoneManager.isRatisEnabled(), getBucketLayout());
+    OMClientResponse omClientResponse = new OMDirectoriesPurgeResponseWithFSO(
+        omResponse.build(), purgeRequests, ozoneManager.isRatisEnabled(),
+            getBucketLayout());
     addResponseToDoubleBuffer(trxnLogIndex, omClientResponse,
         omDoubleBufferHelper);
 
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCommitRequestWithFSO.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCommitRequestWithFSO.java
index 3e4d35ee65..537849a7ca 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCommitRequestWithFSO.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCommitRequestWithFSO.java
@@ -122,13 +122,17 @@ public class OMKeyCommitRequestWithFSO extends OMKeyCommitRequest {
 
       String fileName = OzoneFSUtils.getFileName(keyName);
       omBucketInfo = getBucketInfo(omMetadataManager, volumeName, bucketName);
-      long bucketId = omBucketInfo.getObjectID();
-      long parentID = OMFileRequest.getParentID(bucketId, pathComponents,
-          keyName, omMetadataManager, "Cannot create file : " + keyName
+      final long volumeId = omMetadataManager.getVolumeId(volumeName);
+      final long bucketId = omMetadataManager.getBucketId(
+              volumeName, bucketName);
+      long parentID = OMFileRequest.getParentID(volumeId, bucketId,
+              pathComponents, keyName, omMetadataManager,
+              "Cannot create file : " + keyName
               + " as parent directory doesn't exist");
-      String dbFileKey = omMetadataManager.getOzonePathKey(parentID, fileName);
-      dbOpenFileKey = omMetadataManager.getOpenFileName(parentID, fileName,
-              commitKeyRequest.getClientID());
+      String dbFileKey = omMetadataManager.getOzonePathKey(volumeId, bucketId,
+              parentID, fileName);
+      dbOpenFileKey = omMetadataManager.getOpenFileName(volumeId, bucketId,
+              parentID, fileName, commitKeyRequest.getClientID());
 
       omKeyInfo = OMFileRequest.getOmKeyInfoFromFileTable(true,
               omMetadataManager, dbOpenFileKey, keyName);
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCreateRequestWithFSO.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCreateRequestWithFSO.java
index 895c61cf09..1895b0a1fa 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCreateRequestWithFSO.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCreateRequestWithFSO.java
@@ -111,6 +111,12 @@ public class OMKeyCreateRequestWithFSO extends OMKeyCreateRequest {
               volumeName, bucketName);
       validateBucketAndVolume(omMetadataManager, volumeName, bucketName);
 
+      final long volumeId = omMetadataManager.getVolumeTable()
+              .get(omMetadataManager.getVolumeKey(volumeName)).getObjectID();
+      final long bucketId = omMetadataManager.getBucketTable()
+              .get(omMetadataManager.getBucketKey(volumeName, bucketName))
+              .getObjectID();
+
       OmKeyInfo dbFileInfo = null;
 
       OMFileRequest.OMPathInfoWithFSO pathInfoFSO =
@@ -119,7 +125,7 @@ public class OMKeyCreateRequestWithFSO extends OMKeyCreateRequest {
 
       if (pathInfoFSO.getDirectoryResult()
               == OMFileRequest.OMDirectoryResult.FILE_EXISTS) {
-        String dbFileKey = omMetadataManager.getOzonePathKey(
+        String dbFileKey = omMetadataManager.getOzonePathKey(volumeId, bucketId,
                 pathInfoFSO.getLastKnownParentId(),
                 pathInfoFSO.getLeafNodeName());
         dbFileInfo = OMFileRequest.getOmKeyInfoFromFileTable(false,
@@ -164,8 +170,9 @@ public class OMKeyCreateRequestWithFSO extends OMKeyCreateRequest {
       long openVersion = omFileInfo.getLatestVersionLocations().getVersion();
       long clientID = createKeyRequest.getClientID();
       String dbOpenFileName = omMetadataManager
-          .getOpenFileName(pathInfoFSO.getLastKnownParentId(),
-              pathInfoFSO.getLeafNodeName(), clientID);
+          .getOpenFileName(volumeId, bucketId,
+                  pathInfoFSO.getLastKnownParentId(),
+                  pathInfoFSO.getLeafNodeName(), clientID);
 
       // Append new blocks
       List<OmKeyLocationInfo> newLocationList = keyArgs.getKeyLocationsList()
@@ -191,8 +198,8 @@ public class OMKeyCreateRequestWithFSO extends OMKeyCreateRequest {
       // Add cache entries for the prefix directories.
       // Skip adding for the file key itself, until Key Commit.
       OMFileRequest.addDirectoryTableCacheEntries(omMetadataManager,
-              Optional.absent(), Optional.of(missingParentInfos),
-              trxnLogIndex);
+              volumeId, bucketId, trxnLogIndex,
+              Optional.of(missingParentInfos), Optional.absent());
 
       // Prepare response. Sets user given full key name in the 'keyName'
       // attribute in response object.
@@ -252,11 +259,15 @@ public class OMKeyCreateRequestWithFSO extends OMKeyCreateRequest {
                                          OMMetadataManager omMetadataManager)
       throws IOException {
 
+    final long volumeId = omMetadataManager.getVolumeId(volumeName);
+    final long bucketId = omMetadataManager.getBucketId(volumeName,
+            bucketName);
     long parentId =
         getParentId(omMetadataManager, volumeName, bucketName, keyName);
 
     String fileName = OzoneFSUtils.getFileName(keyName);
 
-    return omMetadataManager.getMultipartKey(parentId, fileName, uploadID);
+    return omMetadataManager.getMultipartKey(volumeId, bucketId, parentId,
+            fileName, uploadID);
   }
 }
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyDeleteRequestWithFSO.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyDeleteRequestWithFSO.java
index 51443a0691..dda99b3da3 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyDeleteRequestWithFSO.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyDeleteRequestWithFSO.java
@@ -127,8 +127,12 @@ public class OMKeyDeleteRequestWithFSO extends OMKeyDeleteRequest {
       // Set the UpdateID to current transactionLogIndex
       omKeyInfo.setUpdateID(trxnLogIndex, ozoneManager.isRatisEnabled());
 
-      String ozonePathKey = omMetadataManager.getOzonePathKey(
-              omKeyInfo.getParentObjectID(), omKeyInfo.getFileName());
+      final long volumeId = omMetadataManager.getVolumeId(volumeName);
+      final long bucketId = omMetadataManager.getBucketId(volumeName,
+              bucketName);
+      String ozonePathKey = omMetadataManager.getOzonePathKey(volumeId,
+              bucketId, omKeyInfo.getParentObjectID(),
+              omKeyInfo.getFileName());
 
       if (keyStatus.isDirectory()) {
         // Check if there are any sub path exists under the user requested path
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyRenameRequestWithFSO.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyRenameRequestWithFSO.java
index 37497ded9b..b83a2d4e2f 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyRenameRequestWithFSO.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyRenameRequestWithFSO.java
@@ -246,9 +246,14 @@ public class OMKeyRenameRequestWithFSO extends OMKeyRenameRequest {
   private OMClientResponse renameKey(long toKeyParentId,
       long trxnLogIndex, OmKeyInfo fromKeyValue, boolean isRenameDirectory,
       String toKeyName, long modificationTime, OMResponse.Builder omResponse,
-      OzoneManager ozoneManager) {
-
-    String dbFromKey = fromKeyValue.getPath();
+      OzoneManager ozoneManager) throws IOException {
+
+    final OMMetadataManager ommm = ozoneManager.getMetadataManager();
+    final long volumeId = ommm.getVolumeId(fromKeyValue.getVolumeName());
+    final long bucketId = ommm.getBucketId(fromKeyValue.getVolumeName(),
+            fromKeyValue.getBucketName());
+    final String dbFromKey = ommm.getOzonePathKey(volumeId, bucketId,
+            fromKeyValue.getParentObjectID(), fromKeyValue.getFileName());
     String toKeyFileName = OzoneFSUtils.getFileName(toKeyName);
 
     fromKeyValue.setUpdateID(trxnLogIndex, ozoneManager.isRatisEnabled());
@@ -261,7 +266,8 @@ public class OMKeyRenameRequestWithFSO extends OMKeyRenameRequest {
     fromKeyValue.setModificationTime(modificationTime);
 
     // destination dbKeyName
-    String dbToKey = fromKeyValue.getPath();
+    String dbToKey = ommm.getOzonePathKey(volumeId, bucketId,
+            fromKeyValue.getParentObjectID(), toKeyFileName);
 
     // Add to cache.
     // dbFromKey should be deleted, dbToKey should be added with newly updated
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyRequest.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyRequest.java
index 8a90246d15..b1e13ca2e9 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyRequest.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyRequest.java
@@ -739,8 +739,12 @@ public abstract class OMKeyRequest extends OMClientRequest {
     Preconditions.checkNotNull(uploadID);
     String multipartKey = "";
     if (omPathInfo != null) {
+      final long volumeId = omMetadataManager.getVolumeId(
+              args.getVolumeName());
+      final long bucketId = omMetadataManager.getBucketId(
+              args.getVolumeName(), args.getBucketName());
       // FileTable metadata format
-      multipartKey = omMetadataManager.getMultipartKey(
+      multipartKey = omMetadataManager.getMultipartKey(volumeId, bucketId,
               omPathInfo.getLastKnownParentId(),
               omPathInfo.getLeafNodeName(), uploadID);
     } else {
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeysDeleteRequest.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeysDeleteRequest.java
index 1781612d0b..14d3fcbd8d 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeysDeleteRequest.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeysDeleteRequest.java
@@ -266,7 +266,8 @@ public class OMKeysDeleteRequest extends OMKeyRequest {
 
   protected long markKeysAsDeletedInCache(OzoneManager ozoneManager,
       long trxnLogIndex, List<OmKeyInfo> omKeyInfoList, List<OmKeyInfo> dirList,
-      OMMetadataManager omMetadataManager, long quotaReleased) {
+      OMMetadataManager omMetadataManager, long quotaReleased)
+          throws IOException {
     for (OmKeyInfo omKeyInfo : omKeyInfoList) {
       omMetadataManager.getKeyTable(getBucketLayout()).addCacheEntry(
           new CacheKey<>(omMetadataManager
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OmKeysDeleteRequestWithFSO.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OmKeysDeleteRequestWithFSO.java
index a6c337e762..3e4c62f9cf 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OmKeysDeleteRequestWithFSO.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OmKeysDeleteRequestWithFSO.java
@@ -92,17 +92,23 @@ public class OmKeysDeleteRequestWithFSO extends OMKeysDeleteRequest {
   }
 
   @Override
-  protected long markKeysAsDeletedInCache(OzoneManager ozoneManager,
-      long trxnLogIndex,
-      List<OmKeyInfo> omKeyInfoList, List<OmKeyInfo> dirList,
-      OMMetadataManager omMetadataManager, long quotaReleased) {
+  protected long markKeysAsDeletedInCache(
+          OzoneManager ozoneManager, long trxnLogIndex,
+          List<OmKeyInfo> omKeyInfoList,
+          List<OmKeyInfo> dirList, OMMetadataManager omMetadataManager,
+          long quotaReleased) throws IOException {
 
     // Mark all keys which can be deleted, in cache as deleted.
     for (OmKeyInfo omKeyInfo : omKeyInfoList) {
+      final long volumeId = omMetadataManager.getVolumeId(
+              omKeyInfo.getVolumeName());
+      final long bucketId = omMetadataManager.getBucketId(
+              omKeyInfo.getVolumeName(), omKeyInfo.getBucketName());
       omMetadataManager.getKeyTable(getBucketLayout()).addCacheEntry(
           new CacheKey<>(omMetadataManager
-              .getOzonePathKey(omKeyInfo.getParentObjectID(),
-                  omKeyInfo.getFileName())),
+              .getOzonePathKey(volumeId, bucketId,
+                      omKeyInfo.getParentObjectID(),
+                      omKeyInfo.getFileName())),
           new CacheValue<>(Optional.absent(), trxnLogIndex));
 
       omKeyInfo.setUpdateID(trxnLogIndex, ozoneManager.isRatisEnabled());
@@ -110,8 +116,13 @@ public class OmKeysDeleteRequestWithFSO extends OMKeysDeleteRequest {
     }
     // Mark directory keys.
     for (OmKeyInfo omKeyInfo : dirList) {
+      final long volumeId = omMetadataManager.getVolumeId(
+              omKeyInfo.getVolumeName());
+      final long bucketId = omMetadataManager.getBucketId(
+              omKeyInfo.getVolumeName(), omKeyInfo.getBucketName());
       omMetadataManager.getDirectoryTable().addCacheEntry(new CacheKey<>(
-              omMetadataManager.getOzonePathKey(omKeyInfo.getParentObjectID(),
+              omMetadataManager.getOzonePathKey(volumeId, bucketId,
+                      omKeyInfo.getParentObjectID(),
                   omKeyInfo.getFileName())),
           new CacheValue<>(Optional.absent(), trxnLogIndex));
 
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/acl/OMKeyAclRequestWithFSO.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/acl/OMKeyAclRequestWithFSO.java
index 8bcfc98af1..6b831873c5 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/acl/OMKeyAclRequestWithFSO.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/acl/OMKeyAclRequestWithFSO.java
@@ -93,7 +93,10 @@ public abstract class OMKeyAclRequestWithFSO extends OMKeyAclRequest {
         throw new OMException("Key not found. Key:" + key, KEY_NOT_FOUND);
       }
       omKeyInfo = keyStatus.getKeyInfo();
-      String dbKey = omKeyInfo.getPath();
+      final long volumeId = omMetadataManager.getVolumeId(volume);
+      final long bucketId = omMetadataManager.getBucketId(volume, bucket);
+      final String dbKey = omMetadataManager.getOzonePathKey(volumeId, bucketId,
+              omKeyInfo.getParentObjectID(), omKeyInfo.getFileName());
       boolean isDirectory = keyStatus.isDirectory();
       operationResult = apply(omKeyInfo, trxnLogIndex);
       omKeyInfo.setUpdateID(trxnLogIndex, ozoneManager.isRatisEnabled());
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3InitiateMultipartUploadRequestWithFSO.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3InitiateMultipartUploadRequestWithFSO.java
index 3f73ffbf5c..0aa0f63041 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3InitiateMultipartUploadRequestWithFSO.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3InitiateMultipartUploadRequestWithFSO.java
@@ -145,9 +145,15 @@ public class S3InitiateMultipartUploadRequestWithFSO
           volumeName, bucketName, keyName,
           keyArgs.getMultipartUploadID());
 
+      final long volumeId = omMetadataManager.getVolumeId(volumeName);
+      final long bucketId = omMetadataManager.getBucketId(volumeName,
+              bucketName);
+
       String multipartOpenKey = omMetadataManager
-          .getMultipartKey(pathInfoFSO.getLastKnownParentId(),
-              pathInfoFSO.getLeafNodeName(), keyArgs.getMultipartUploadID());
+          .getMultipartKey(volumeId, bucketId,
+                  pathInfoFSO.getLastKnownParentId(),
+                  pathInfoFSO.getLeafNodeName(),
+                  keyArgs.getMultipartUploadID());
 
       // Even if this key already exists in the KeyTable, it would be taken
       // care of in the final complete multipart upload. AWS S3 behavior is
@@ -191,8 +197,8 @@ public class S3InitiateMultipartUploadRequestWithFSO
       // Add cache entries for the prefix directories.
       // Skip adding for the file key itself, until Key Commit.
       OMFileRequest.addDirectoryTableCacheEntries(omMetadataManager,
-              Optional.absent(), Optional.of(missingParentInfos),
-              transactionLogIndex);
+              volumeId, bucketId, transactionLogIndex,
+              Optional.of(missingParentInfos), Optional.absent());
 
       OMFileRequest.addOpenFileTableCacheEntry(omMetadataManager,
           multipartOpenKey, omKeyInfo, pathInfoFSO.getLeafNodeName(),
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3MultipartUploadAbortRequestWithFSO.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3MultipartUploadAbortRequestWithFSO.java
index 62999cfc59..f15b5dabbb 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3MultipartUploadAbortRequestWithFSO.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3MultipartUploadAbortRequestWithFSO.java
@@ -76,15 +76,14 @@ public class S3MultipartUploadAbortRequestWithFSO
 
     String fileName = OzoneFSUtils.getFileName(keyName);
     Iterator<Path> pathComponents = Paths.get(keyName).iterator();
-    String bucketKey = omMetadataManager.getBucketKey(volumeName, bucketName);
-    OmBucketInfo omBucketInfo =
-        omMetadataManager.getBucketTable().get(bucketKey);
-    long bucketId = omBucketInfo.getObjectID();
-    long parentID = OMFileRequest.getParentID(bucketId, pathComponents,
-        keyName, omMetadataManager);
+    final long volumeId = omMetadataManager.getVolumeId(volumeName);
+    final long bucketId = omMetadataManager.getBucketId(volumeName,
+            bucketName);
+    long parentID = OMFileRequest.getParentID(volumeId, bucketId,
+            pathComponents, keyName, omMetadataManager);
 
-    String multipartKey = omMetadataManager.getMultipartKey(parentID,
-        fileName, multipartUploadID);
+    String multipartKey = omMetadataManager.getMultipartKey(volumeId, bucketId,
+            parentID, fileName, multipartUploadID);
 
     return multipartKey;
   }
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3MultipartUploadCommitPartRequestWithFSO.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3MultipartUploadCommitPartRequestWithFSO.java
index 4b30068d99..b69c2e7f03 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3MultipartUploadCommitPartRequestWithFSO.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3MultipartUploadCommitPartRequestWithFSO.java
@@ -54,14 +54,15 @@ public class S3MultipartUploadCommitPartRequestWithFSO
 
     String fileName = OzoneFSUtils.getFileName(keyName);
     Iterator<Path> pathComponents = Paths.get(keyName).iterator();
-    String bucketKey = omMetadataManager.getBucketKey(volumeName, bucketName);
-    OmBucketInfo omBucketInfo =
-        omMetadataManager.getBucketTable().get(bucketKey);
-    long bucketId = omBucketInfo.getObjectID();
+    final long volumeId = omMetadataManager.getVolumeId(volumeName);
+    final long bucketId = omMetadataManager.getBucketId(volumeName,
+            bucketName);
     long parentID = OMFileRequest
-        .getParentID(bucketId, pathComponents, keyName, omMetadataManager);
+        .getParentID(volumeId, bucketId, pathComponents, keyName,
+                omMetadataManager);
 
-    return omMetadataManager.getOpenFileName(parentID, fileName, clientID);
+    return omMetadataManager.getOpenFileName(volumeId, bucketId,
+            parentID, fileName, clientID);
   }
 
   @Override
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3MultipartUploadCompleteRequestWithFSO.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3MultipartUploadCompleteRequestWithFSO.java
index 18302c5616..0e25276a6b 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3MultipartUploadCompleteRequestWithFSO.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3MultipartUploadCompleteRequestWithFSO.java
@@ -119,7 +119,11 @@ public class S3MultipartUploadCompleteRequestWithFSO
       fileName = filePath.toString();
     }
 
-    return omMetadataManager.getOzonePathKey(parentId, fileName);
+    final long volumeId = omMetadataManager.getVolumeId(volumeName);
+    final long bucketId = omMetadataManager.getBucketId(volumeName,
+            bucketName);
+    return omMetadataManager.getOzonePathKey(volumeId, bucketId,
+            parentId, fileName);
   }
 
   @Override
@@ -136,7 +140,12 @@ public class S3MultipartUploadCompleteRequestWithFSO
       fileName = filePath.toString();
     }
 
-    return omMetadataManager.getMultipartKey(parentId, fileName, uploadID);
+    final long volumeId = omMetadataManager.getVolumeId(volumeName);
+    final long bucketId = omMetadataManager.getBucketId(volumeName,
+            bucketName);
+
+    return omMetadataManager.getMultipartKey(volumeId, bucketId,
+            parentId, fileName, uploadID);
   }
 
   @Override
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/file/OMDirectoryCreateResponseWithFSO.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/file/OMDirectoryCreateResponseWithFSO.java
index f136227f50..f7bf62e740 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/file/OMDirectoryCreateResponseWithFSO.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/file/OMDirectoryCreateResponseWithFSO.java
@@ -47,8 +47,12 @@ public class OMDirectoryCreateResponseWithFSO extends OmKeyResponse {
   private OmDirectoryInfo dirInfo;
   private List<OmDirectoryInfo> parentDirInfos;
   private Result result;
+  private String volume;
+  private String bucket;
 
   public OMDirectoryCreateResponseWithFSO(@Nonnull OMResponse omResponse,
+                                          @Nonnull String volume,
+                                          @Nonnull String bucket,
                                           @Nonnull OmDirectoryInfo dirInfo,
                                           @Nonnull
                                               List<OmDirectoryInfo> pDirInfos,
@@ -58,6 +62,8 @@ public class OMDirectoryCreateResponseWithFSO extends OmKeyResponse {
     this.dirInfo = dirInfo;
     this.parentDirInfos = pDirInfos;
     this.result = result;
+    this.volume = volume;
+    this.bucket = bucket;
   }
 
   /**
@@ -80,10 +86,13 @@ public class OMDirectoryCreateResponseWithFSO extends OmKeyResponse {
                                 BatchOperation batchOperation)
           throws IOException {
     if (dirInfo != null) {
+      final long volumeId = omMetadataManager.getVolumeId(volume);
+      final long bucketId = omMetadataManager.getBucketId(volume, bucket);
       if (parentDirInfos != null) {
         for (OmDirectoryInfo parentDirInfo : parentDirInfos) {
           String parentKey = omMetadataManager
-                  .getOzonePathKey(parentDirInfo.getParentObjectID(),
+                  .getOzonePathKey(volumeId, bucketId,
+                          parentDirInfo.getParentObjectID(),
                           parentDirInfo.getName());
           LOG.debug("putWithBatch parent : dir {} info : {}", parentKey,
                   parentDirInfo);
@@ -92,7 +101,7 @@ public class OMDirectoryCreateResponseWithFSO extends OmKeyResponse {
         }
       }
 
-      String dirKey = omMetadataManager.getOzonePathKey(
+      String dirKey = omMetadataManager.getOzonePathKey(volumeId, bucketId,
               dirInfo.getParentObjectID(), dirInfo.getName());
       omMetadataManager.getDirectoryTable().putWithBatch(batchOperation, dirKey,
               dirInfo);
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/file/OMFileCreateResponseWithFSO.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/file/OMFileCreateResponseWithFSO.java
index 1eb03694f3..b54d22e837 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/file/OMFileCreateResponseWithFSO.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/file/OMFileCreateResponseWithFSO.java
@@ -75,8 +75,13 @@ public class OMFileCreateResponseWithFSO extends OMFileCreateResponse {
      * XXX handle stale directory entries.
      */
     if (parentDirInfos != null) {
+      final long volumeId = omMetadataMgr.getVolumeId(
+              getOmKeyInfo().getVolumeName());
+      final long bucketId = omMetadataMgr.getBucketId(
+              getOmKeyInfo().getVolumeName(), getOmKeyInfo().getBucketName());
       for (OmDirectoryInfo parentDirInfo : parentDirInfos) {
-        String parentKey = parentDirInfo.getPath();
+        String parentKey = omMetadataMgr.getOzonePathKey(volumeId, bucketId,
+                parentDirInfo.getParentObjectID(), parentDirInfo.getName());
         if (LOG.isDebugEnabled()) {
           LOG.debug("putWithBatch adding parent : key {} info : {}", parentKey,
                   parentDirInfo);
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/OMDirectoriesPurgeResponseWithFSO.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/OMDirectoriesPurgeResponseWithFSO.java
new file mode 100644
index 0000000000..331559bfde
--- /dev/null
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/OMDirectoriesPurgeResponseWithFSO.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.om.response.key;
+
+import org.apache.hadoop.hdds.utils.db.BatchOperation;
+import org.apache.hadoop.ozone.OmUtils;
+import org.apache.hadoop.ozone.om.OMMetadataManager;
+import org.apache.hadoop.ozone.om.helpers.BucketLayout;
+import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
+import org.apache.hadoop.ozone.om.helpers.RepeatedOmKeyInfo;
+import org.apache.hadoop.ozone.om.request.key.OMDirectoriesPurgeRequestWithFSO;
+import org.apache.hadoop.ozone.om.response.CleanupTableInfo;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+import java.io.IOException;
+import java.util.List;
+
+import static org.apache.hadoop.ozone.om.OmMetadataManagerImpl.DELETED_DIR_TABLE;
+import static org.apache.hadoop.ozone.om.OmMetadataManagerImpl.DELETED_TABLE;
+import static org.apache.hadoop.ozone.om.OmMetadataManagerImpl.DIRECTORY_TABLE;
+import static org.apache.hadoop.ozone.om.OmMetadataManagerImpl.FILE_TABLE;
+
+/**
+ * Response for {@link OMDirectoriesPurgeRequestWithFSO} request.
+ */
+@CleanupTableInfo(cleanupTables = {DELETED_TABLE, DELETED_DIR_TABLE,
+    DIRECTORY_TABLE, FILE_TABLE})
+public class OMDirectoriesPurgeResponseWithFSO extends OmKeyResponse {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(OMDirectoriesPurgeResponseWithFSO.class);
+
+  private List<OzoneManagerProtocolProtos.PurgePathRequest> paths;
+  private boolean isRatisEnabled;
+
+
+  public OMDirectoriesPurgeResponseWithFSO(@Nonnull OMResponse omResponse,
+      @Nonnull List<OzoneManagerProtocolProtos.PurgePathRequest> paths,
+      boolean isRatisEnabled, @Nonnull BucketLayout bucketLayout) {
+    super(omResponse, bucketLayout);
+    this.paths = paths;
+    this.isRatisEnabled = isRatisEnabled;
+  }
+
+  @Override
+  public void addToDBBatch(OMMetadataManager omMetadataManager,
+      BatchOperation batchOperation) throws IOException {
+
+    for (OzoneManagerProtocolProtos.PurgePathRequest path : paths) {
+      final long volumeId = path.getVolumeId();
+      final long bucketId = path.getBucketId();
+
+      final List<OzoneManagerProtocolProtos.KeyInfo> deletedSubFilesList =
+              path.getDeletedSubFilesList();
+      final List<OzoneManagerProtocolProtos.KeyInfo> markDeletedSubDirsList =
+              path.getMarkDeletedSubDirsList();
+
+      // Add all sub-directories to deleted directory table.
+      for (OzoneManagerProtocolProtos.KeyInfo key : markDeletedSubDirsList) {
+        OmKeyInfo keyInfo = OmKeyInfo.getFromProtobuf(key);
+        String ozoneDbKey = omMetadataManager.getOzonePathKey(volumeId,
+                bucketId, keyInfo.getParentObjectID(), keyInfo.getFileName());
+        omMetadataManager.getDeletedDirTable().putWithBatch(batchOperation,
+                ozoneDbKey, keyInfo);
+
+        omMetadataManager.getDirectoryTable().deleteWithBatch(batchOperation,
+                ozoneDbKey);
+
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("markDeletedDirList KeyName: {}, DBKey: {}",
+                  keyInfo.getKeyName(), ozoneDbKey);
+        }
+      }
+
+      for (OzoneManagerProtocolProtos.KeyInfo key : deletedSubFilesList) {
+        OmKeyInfo keyInfo = OmKeyInfo.getFromProtobuf(key);
+        String ozoneDbKey = omMetadataManager.getOzonePathKey(volumeId,
+                bucketId, keyInfo.getParentObjectID(), keyInfo.getFileName());
+        omMetadataManager.getKeyTable(getBucketLayout())
+                .deleteWithBatch(batchOperation, ozoneDbKey);
+
+        if (LOG.isDebugEnabled()) {
+          LOG.info("Move keyName:{} to DeletedTable DBKey: {}",
+                  keyInfo.getKeyName(), ozoneDbKey);
+        }
+
+        RepeatedOmKeyInfo repeatedOmKeyInfo = OmUtils.prepareKeyForDelete(
+                keyInfo, null, keyInfo.getUpdateID(), isRatisEnabled);
+
+        String deletedKey = omMetadataManager
+                .getOzoneKey(keyInfo.getVolumeName(), keyInfo.getBucketName(),
+                        keyInfo.getKeyName());
+
+        omMetadataManager.getDeletedTable().putWithBatch(batchOperation,
+                deletedKey, repeatedOmKeyInfo);
+
+      }
+
+      // Delete the visited directory from deleted directory table
+      if (path.hasDeletedDir()) {
+        omMetadataManager.getDeletedDirTable().deleteWithBatch(batchOperation,
+                path.getDeletedDir());
+
+        if (LOG.isDebugEnabled()) {
+          LOG.info("Purge Deleted Directory DBKey: {}", path.getDeletedDir());
+        }
+      }
+    }
+  }
+}
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/OMKeyDeleteResponseWithFSO.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/OMKeyDeleteResponseWithFSO.java
index b0d9754f24..5c521c3179 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/OMKeyDeleteResponseWithFSO.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/OMKeyDeleteResponseWithFSO.java
@@ -68,10 +68,17 @@ public class OMKeyDeleteResponseWithFSO extends OMKeyDeleteResponse {
   public void addToDBBatch(OMMetadataManager omMetadataManager,
       BatchOperation batchOperation) throws IOException {
 
+    final long volumeId = omMetadataManager.getVolumeId(
+            getOmKeyInfo().getVolumeName());
+    final long bucketId = omMetadataManager.getBucketId(
+            getOmKeyInfo().getVolumeName(),
+            getOmKeyInfo().getBucketName());
+
     // For OmResponse with failure, this should do nothing. This method is
     // not called in failure scenario in OM code.
-    String ozoneDbKey = omMetadataManager.getOzonePathKey(
-            getOmKeyInfo().getParentObjectID(), getOmKeyInfo().getFileName());
+    String ozoneDbKey = omMetadataManager.getOzonePathKey(volumeId,
+            bucketId, getOmKeyInfo().getParentObjectID(),
+            getOmKeyInfo().getFileName());
 
     if (isDeleteDirectory) {
       omMetadataManager.getDirectoryTable().deleteWithBatch(batchOperation,
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/OMKeysDeleteResponseWithFSO.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/OMKeysDeleteResponseWithFSO.java
index 0c436aad75..25e1de3541 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/OMKeysDeleteResponseWithFSO.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/OMKeysDeleteResponseWithFSO.java
@@ -62,7 +62,11 @@ public class OMKeysDeleteResponseWithFSO extends OMKeysDeleteResponse {
 
     // remove dirs from DirTable and add to DeletedDirTable
     for (OmKeyInfo omKeyInfo : dirsList) {
-      String ozoneDbKey = omMetadataManager.getOzonePathKey(
+      final long volumeId = omMetadataManager.getVolumeId(
+              omKeyInfo.getVolumeName());
+      final long bucketId = omMetadataManager.getBucketId(
+              omKeyInfo.getVolumeName(), omKeyInfo.getBucketName());
+      String ozoneDbKey = omMetadataManager.getOzonePathKey(volumeId, bucketId,
           omKeyInfo.getParentObjectID(), omKeyInfo.getFileName());
       omMetadataManager.getDirectoryTable().deleteWithBatch(batchOperation,
           ozoneDbKey);
@@ -72,7 +76,11 @@ public class OMKeysDeleteResponseWithFSO extends OMKeysDeleteResponse {
 
     // remove keys from FileTable and add to DeletedTable
     for (OmKeyInfo omKeyInfo : getOmKeyInfoList()) {
-      String ozoneDbKey = omMetadataManager.getOzonePathKey(
+      final long volumeId = omMetadataManager.getVolumeId(
+              omKeyInfo.getVolumeName());
+      final long bucketId = omMetadataManager.getBucketId(
+              omKeyInfo.getVolumeName(), omKeyInfo.getBucketName());
+      String ozoneDbKey = omMetadataManager.getOzonePathKey(volumeId, bucketId,
           omKeyInfo.getParentObjectID(), omKeyInfo.getFileName());
       String deletedKey = omMetadataManager
           .getOzoneKey(omKeyInfo.getVolumeName(), omKeyInfo.getBucketName(),
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/OMPathsPurgeResponseWithFSO.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/OMPathsPurgeResponseWithFSO.java
deleted file mode 100644
index c487ca0629..0000000000
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/OMPathsPurgeResponseWithFSO.java
+++ /dev/null
@@ -1,125 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.ozone.om.response.key;
-
-import org.apache.hadoop.hdds.utils.db.BatchOperation;
-import org.apache.hadoop.ozone.OmUtils;
-import org.apache.hadoop.ozone.om.OMMetadataManager;
-import org.apache.hadoop.ozone.om.helpers.BucketLayout;
-import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
-import org.apache.hadoop.ozone.om.helpers.RepeatedOmKeyInfo;
-import org.apache.hadoop.ozone.om.request.key.OMPathsPurgeRequestWithFSO;
-import org.apache.hadoop.ozone.om.response.CleanupTableInfo;
-import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
-import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.annotation.Nonnull;
-import java.io.IOException;
-import java.util.List;
-
-import static org.apache.hadoop.ozone.om.OmMetadataManagerImpl.DELETED_DIR_TABLE;
-import static org.apache.hadoop.ozone.om.OmMetadataManagerImpl.DELETED_TABLE;
-import static org.apache.hadoop.ozone.om.OmMetadataManagerImpl.DIRECTORY_TABLE;
-import static org.apache.hadoop.ozone.om.OmMetadataManagerImpl.FILE_TABLE;
-
-/**
- * Response for {@link OMPathsPurgeRequestWithFSO} request.
- */
-@CleanupTableInfo(cleanupTables = {DELETED_TABLE, DELETED_DIR_TABLE,
-    DIRECTORY_TABLE, FILE_TABLE})
-public class OMPathsPurgeResponseWithFSO extends OmKeyResponse {
-  private static final Logger LOG =
-      LoggerFactory.getLogger(OMPathsPurgeResponseWithFSO.class);
-
-  private List<OzoneManagerProtocolProtos.KeyInfo> markDeletedDirList;
-  private List<String> dirList;
-  private List<OzoneManagerProtocolProtos.KeyInfo> fileList;
-  private boolean isRatisEnabled;
-
-
-  public OMPathsPurgeResponseWithFSO(@Nonnull OMResponse omResponse,
-      @Nonnull List<OzoneManagerProtocolProtos.KeyInfo> markDeletedDirs,
-      @Nonnull List<OzoneManagerProtocolProtos.KeyInfo> files,
-      @Nonnull List<String> dirs, boolean isRatisEnabled,
-      @Nonnull BucketLayout bucketLayout) {
-    super(omResponse, bucketLayout);
-    this.markDeletedDirList = markDeletedDirs;
-    this.dirList = dirs;
-    this.fileList = files;
-    this.isRatisEnabled = isRatisEnabled;
-  }
-
-  @Override
-  public void addToDBBatch(OMMetadataManager omMetadataManager,
-      BatchOperation batchOperation) throws IOException {
-
-    // Add all sub-directories to deleted directory table.
-    for (OzoneManagerProtocolProtos.KeyInfo key : markDeletedDirList) {
-      OmKeyInfo keyInfo = OmKeyInfo.getFromProtobuf(key);
-      String ozoneDbKey = omMetadataManager.getOzonePathKey(
-          keyInfo.getParentObjectID(), keyInfo.getFileName());
-      omMetadataManager.getDeletedDirTable().putWithBatch(batchOperation,
-          ozoneDbKey, keyInfo);
-
-      omMetadataManager.getDirectoryTable().deleteWithBatch(batchOperation,
-          ozoneDbKey);
-
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("markDeletedDirList KeyName: {}, DBKey: {}",
-            keyInfo.getKeyName(), ozoneDbKey);
-      }
-    }
-
-    // Delete all the visited directories from deleted directory table
-    for (String key : dirList) {
-      omMetadataManager.getDeletedDirTable().deleteWithBatch(batchOperation,
-          key);
-
-      if (LOG.isDebugEnabled()) {
-        LOG.info("Purge Deleted Directory DBKey: {}", key);
-      }
-    }
-    for (OzoneManagerProtocolProtos.KeyInfo key : fileList) {
-      OmKeyInfo keyInfo = OmKeyInfo.getFromProtobuf(key);
-      String ozoneDbKey = omMetadataManager.getOzonePathKey(
-          keyInfo.getParentObjectID(), keyInfo.getFileName());
-      omMetadataManager.getKeyTable(getBucketLayout())
-          .deleteWithBatch(batchOperation, ozoneDbKey);
-
-      if (LOG.isDebugEnabled()) {
-        LOG.info("Move keyName:{} to DeletedTable DBKey: {}",
-            keyInfo.getKeyName(), ozoneDbKey);
-      }
-
-      RepeatedOmKeyInfo repeatedOmKeyInfo = null;
-      repeatedOmKeyInfo = OmUtils.prepareKeyForDelete(keyInfo,
-          repeatedOmKeyInfo, keyInfo.getUpdateID(), isRatisEnabled);
-
-      String deletedKey = omMetadataManager
-          .getOzoneKey(keyInfo.getVolumeName(), keyInfo.getBucketName(),
-              keyInfo.getKeyName());
-
-      omMetadataManager.getDeletedTable().putWithBatch(batchOperation,
-          deletedKey, repeatedOmKeyInfo);
-
-    }
-  }
-}
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/acl/OMKeyAclResponseWithFSO.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/acl/OMKeyAclResponseWithFSO.java
index e5f24a8d3a..2d86defdd6 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/acl/OMKeyAclResponseWithFSO.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/acl/OMKeyAclResponseWithFSO.java
@@ -64,9 +64,14 @@ public class OMKeyAclResponseWithFSO extends OMKeyAclResponse {
   @Override public void addToDBBatch(OMMetadataManager omMetadataManager,
       BatchOperation batchOperation) throws IOException {
 
+    final long volumeId = omMetadataManager.getVolumeId(
+            getOmKeyInfo().getVolumeName());
+    final long bucketId = omMetadataManager.getBucketId(
+            getOmKeyInfo().getVolumeName(), getOmKeyInfo().getBucketName());
     String ozoneDbKey = omMetadataManager
-        .getOzonePathKey(getOmKeyInfo().getParentObjectID(),
-            getOmKeyInfo().getFileName());
+        .getOzonePathKey(volumeId, bucketId,
+                getOmKeyInfo().getParentObjectID(),
+                getOmKeyInfo().getFileName());
     if (isDirectory) {
       OmDirectoryInfo dirInfo = OMFileRequest.getDirectoryInfo(getOmKeyInfo());
       omMetadataManager.getDirectoryTable()
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/s3/multipart/S3InitiateMultipartUploadResponseWithFSO.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/s3/multipart/S3InitiateMultipartUploadResponseWithFSO.java
index 250706d35c..9e54ac01b9 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/s3/multipart/S3InitiateMultipartUploadResponseWithFSO.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/s3/multipart/S3InitiateMultipartUploadResponseWithFSO.java
@@ -76,8 +76,15 @@ public class S3InitiateMultipartUploadResponseWithFSO extends
      * wait for File Commit request.
      */
     if (parentDirInfos != null) {
+      final OmKeyInfo keyInfo = getOmKeyInfo();
+      final long volumeId = omMetadataManager.getVolumeId(
+              keyInfo.getVolumeName());
+      final long bucketId = omMetadataManager.getBucketId(
+              keyInfo.getVolumeName(), keyInfo.getBucketName());
       for (OmDirectoryInfo parentDirInfo : parentDirInfos) {
-        String parentKey = parentDirInfo.getPath();
+        final String parentKey = omMetadataManager.getOzonePathKey(
+                volumeId, bucketId, parentDirInfo.getParentObjectID(),
+                parentDirInfo.getName());
         omMetadataManager.getDirectoryTable().putWithBatch(batchOperation,
                 parentKey, parentDirInfo);
       }
diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestOmMetadataManager.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestOmMetadataManager.java
index aa8e98c91d..6291f72620 100644
--- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestOmMetadataManager.java
+++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestOmMetadataManager.java
@@ -562,6 +562,12 @@ public class TestOmMetadataManager {
       throws Exception {
     final String bucketName = UUID.randomUUID().toString();
     final String volumeName = UUID.randomUUID().toString();
+    // Add volume, bucket, key entries to DB.
+    OMRequestTestUtils.addVolumeAndBucketToDB(volumeName, bucketName,
+            omMetadataManager, BucketLayout.FILE_SYSTEM_OPTIMIZED);
+    final long volumeId = omMetadataManager.getVolumeId(volumeName);
+    final long bucketId = omMetadataManager.getBucketId(volumeName,
+            bucketName);
     final int numExpiredOpenKeys = 4;
     final int numUnexpiredOpenKeys = 1;
     final long clientID = 1000L;
@@ -594,7 +600,7 @@ public class TestOmMetadataManager {
         keyInfo.setFileName(OzoneFSUtils.getFileName(keyInfo.getKeyName()));
         OMRequestTestUtils.addFileToKeyTable(true, false,
             keyInfo.getFileName(), keyInfo, clientID, 0L, omMetadataManager);
-        dbOpenKeyName = omMetadataManager.getOpenFileName(
+        dbOpenKeyName = omMetadataManager.getOpenFileName(volumeId, bucketId,
             keyInfo.getParentObjectID(), keyInfo.getFileName(), clientID);
       } else {
         OMRequestTestUtils.addKeyToTable(true, false,
diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/OMRequestTestUtils.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/OMRequestTestUtils.java
index 32fde202b1..76e7ab81ca 100644
--- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/OMRequestTestUtils.java
+++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/OMRequestTestUtils.java
@@ -116,9 +116,15 @@ public final class OMRequestTestUtils {
       String bucketName, OMMetadataManager omMetadataManager,
       BucketLayout bucketLayout) throws Exception {
 
-    addVolumeToDB(volumeName, omMetadataManager);
+    if (!omMetadataManager.getVolumeTable().isExist(
+            omMetadataManager.getVolumeKey(volumeName))) {
+      addVolumeToDB(volumeName, omMetadataManager);
+    }
 
-    addBucketToDB(volumeName, bucketName, omMetadataManager, bucketLayout);
+    if (!omMetadataManager.getBucketTable().isExist(
+            omMetadataManager.getBucketKey(volumeName, bucketName))) {
+      addBucketToDB(volumeName, bucketName, omMetadataManager, bucketLayout);
+    }
   }
 
   @SuppressWarnings("parameterNumber")
@@ -313,10 +319,15 @@ public final class OMRequestTestUtils {
    */
   public static void addDirKeyToDirTable(boolean addToCache,
                                          OmDirectoryInfo omDirInfo,
+                                         String volume,
+                                         String bucket,
                                          long trxnLogIndex,
                                          OMMetadataManager omMetadataManager)
           throws Exception {
-    String ozoneKey = omDirInfo.getPath();
+    final long volumeId = omMetadataManager.getVolumeId(volume);
+    final long bucketId = omMetadataManager.getBucketId(volume, bucket);
+    final String ozoneKey = omMetadataManager.getOzonePathKey(volumeId,
+            bucketId, omDirInfo.getParentObjectID(), omDirInfo.getName());
     if (addToCache) {
       omMetadataManager.getDirectoryTable().addCacheEntry(
               new CacheKey<>(ozoneKey),
@@ -347,7 +358,7 @@ public final class OMRequestTestUtils {
             .setModificationTime(Time.now())
             .setObjectID(objectID)
             .setParentObjectID(parentObjID)
-            .setUpdateID(objectID)
+            .setUpdateID(50)
             .build();
   }
 
@@ -443,6 +454,7 @@ public final class OMRequestTestUtils {
     OmVolumeArgs omVolumeArgs =
         OmVolumeArgs.newBuilder().setCreationTime(Time.now())
             .setVolume(volumeName).setAdminName(ownerName)
+            .setObjectID(System.currentTimeMillis())
             .setOwnerName(ownerName).setQuotaInBytes(Long.MAX_VALUE)
             .setQuotaInNamespace(10000L).build();
     omMetadataManager.getVolumeTable().put(
@@ -474,8 +486,10 @@ public final class OMRequestTestUtils {
 
     OmBucketInfo omBucketInfo =
         OmBucketInfo.newBuilder().setVolumeName(volumeName)
-            .setBucketName(bucketName).setCreationTime(Time.now())
-            .setBucketLayout(bucketLayout).build();
+                .setBucketName(bucketName)
+                .setObjectID(System.currentTimeMillis())
+                .setCreationTime(Time.now())
+                .setBucketLayout(bucketLayout).build();
 
     // Add to cache.
     omMetadataManager.getBucketTable().addCacheEntry(
@@ -1142,8 +1156,13 @@ public final class OMRequestTestUtils {
                                        OMMetadataManager omMetadataManager)
           throws Exception {
     if (openKeyTable) {
-      String ozoneKey = omMetadataManager.getOpenFileName(
-              omKeyInfo.getParentObjectID(), fileName, clientID);
+      final long volumeId = omMetadataManager.getVolumeId(
+              omKeyInfo.getVolumeName());
+      final long bucketId = omMetadataManager.getBucketId(
+              omKeyInfo.getVolumeName(), omKeyInfo.getBucketName());
+      final String ozoneKey = omMetadataManager.getOpenFileName(
+              volumeId, bucketId, omKeyInfo.getParentObjectID(),
+              fileName, clientID);
       if (addToCache) {
         omMetadataManager.getOpenKeyTable(BucketLayout.FILE_SYSTEM_OPTIMIZED)
             .addCacheEntry(new CacheKey<>(ozoneKey),
@@ -1153,6 +1172,9 @@ public final class OMRequestTestUtils {
           .put(ozoneKey, omKeyInfo);
     } else {
       String ozoneKey = omMetadataManager.getOzonePathKey(
+              omMetadataManager.getVolumeId(omKeyInfo.getVolumeName()),
+              omMetadataManager.getBucketId(omKeyInfo.getVolumeName(),
+                      omKeyInfo.getBucketName()),
               omKeyInfo.getParentObjectID(), fileName);
       if (addToCache) {
         omMetadataManager.getKeyTable(BucketLayout.FILE_SYSTEM_OPTIMIZED)
@@ -1164,24 +1186,6 @@ public final class OMRequestTestUtils {
     }
   }
 
-  /**
-   * Gets bucketId from OM metadata manager.
-   *
-   * @param volumeName        volume name
-   * @param bucketName        bucket name
-   * @param omMetadataManager metadata manager
-   * @return bucket Id
-   * @throws Exception DB failure
-   */
-  public static long getBucketId(String volumeName, String bucketName,
-                                 OMMetadataManager omMetadataManager)
-          throws Exception {
-    String bucketKey = omMetadataManager.getBucketKey(volumeName, bucketName);
-    OmBucketInfo omBucketInfo =
-            omMetadataManager.getBucketTable().get(bucketKey);
-    return omBucketInfo.getObjectID();
-  }
-
   /**
    * Add path components to the directory table and returns last directory's
    * object id.
@@ -1196,8 +1200,7 @@ public final class OMRequestTestUtils {
   public static long addParentsToDirTable(String volumeName, String bucketName,
                                     String key, OMMetadataManager omMetaMgr)
           throws Exception {
-    long bucketId = OMRequestTestUtils.getBucketId(volumeName, bucketName,
-            omMetaMgr);
+    long bucketId = omMetaMgr.getBucketId(volumeName, bucketName);
     if (org.apache.commons.lang3.StringUtils.isBlank(key)) {
       return bucketId;
     }
@@ -1210,7 +1213,7 @@ public final class OMRequestTestUtils {
               OMRequestTestUtils.createOmDirectoryInfo(pathElement, ++objectId,
                       parentId);
       OMRequestTestUtils.addDirKeyToDirTable(true, omDirInfo,
-              txnID, omMetaMgr);
+              volumeName, bucketName, txnID, omMetaMgr);
       parentId = omDirInfo.getObjectID();
     }
     return parentId;
diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/TestBucketLayoutAwareOMKeyFactory.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/TestBucketLayoutAwareOMKeyFactory.java
index 3af9e66461..18491a98db 100644
--- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/TestBucketLayoutAwareOMKeyFactory.java
+++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/TestBucketLayoutAwareOMKeyFactory.java
@@ -19,7 +19,7 @@ package org.apache.hadoop.ozone.om.request;
 
 import org.apache.hadoop.ozone.om.helpers.BucketLayout;
 import org.apache.hadoop.ozone.om.request.key.OMKeyRequest;
-import org.apache.hadoop.ozone.om.request.key.OMPathsPurgeRequestWithFSO;
+import org.apache.hadoop.ozone.om.request.key.OMDirectoriesPurgeRequestWithFSO;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type;
 import org.junit.Assert;
@@ -132,8 +132,9 @@ public class TestBucketLayoutAwareOMKeyFactory {
       InstantiationException, IllegalAccessException {
     // Add an OMKeyRequest class that does not have a constructor compatible
     // with the Factory class.
-    addRequestClass(Type.PurgePaths, OMPathsPurgeRequestWithFSO.class,
-        BucketLayout.FILE_SYSTEM_OPTIMIZED);
+    addRequestClass(Type.PurgeDirectories,
+            OMDirectoriesPurgeRequestWithFSO.class,
+            BucketLayout.FILE_SYSTEM_OPTIMIZED);
     try {
       // This should fail, since this class does not have a valid constructor -
       // one that takes an OMRequest and a BucketLayout as parameters.
@@ -142,7 +143,7 @@ public class TestBucketLayoutAwareOMKeyFactory {
               .setCmdType(Type.PurgeKeys)
               .setClientId("xyz")
               .build(),
-          getKey(Type.PurgePaths, BucketLayout.FILE_SYSTEM_OPTIMIZED),
+          getKey(Type.PurgeDirectories, BucketLayout.FILE_SYSTEM_OPTIMIZED),
           BucketLayout.FILE_SYSTEM_OPTIMIZED);
       fail("No exception thrown for invalid OMKeyRequest class");
     } catch (NoSuchMethodException ex) {
diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/file/TestOMDirectoryCreateRequestWithFSO.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/file/TestOMDirectoryCreateRequestWithFSO.java
index f0becb9064..99652ed776 100644
--- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/file/TestOMDirectoryCreateRequestWithFSO.java
+++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/file/TestOMDirectoryCreateRequestWithFSO.java
@@ -141,10 +141,9 @@ public class TestOMDirectoryCreateRequestWithFSO {
     OMRequestTestUtils.addVolumeAndBucketToDB(volumeName, bucketName,
             omMetadataManager, getBucketLayout());
 
-    String bucketKey = omMetadataManager.getBucketKey(volumeName, bucketName);
-    OmBucketInfo omBucketInfo =
-            omMetadataManager.getBucketTable().get(bucketKey);
-    long bucketID = omBucketInfo.getObjectID();
+    final long volumeId = omMetadataManager.getVolumeId(volumeName);
+    final long bucketId = omMetadataManager.getBucketId(volumeName,
+            bucketName);
 
     OMRequest omRequest = createDirectoryRequest(volumeName, bucketName,
             keyName);
@@ -165,7 +164,7 @@ public class TestOMDirectoryCreateRequestWithFSO {
 
     Assert.assertTrue(omClientResponse.getOMResponse().getStatus()
             == OzoneManagerProtocolProtos.Status.OK);
-    verifyDirectoriesInDB(dirs, bucketID);
+    verifyDirectoriesInDB(dirs, volumeId, bucketId);
   }
 
   @Test
@@ -244,22 +243,23 @@ public class TestOMDirectoryCreateRequestWithFSO {
     OMRequestTestUtils.addVolumeAndBucketToDB(volumeName, bucketName,
             omMetadataManager, getBucketLayout());
 
-    String bucketKey = omMetadataManager.getBucketKey(volumeName, bucketName);
-    OmBucketInfo omBucketInfo =
-            omMetadataManager.getBucketTable().get(bucketKey);
-    long bucketID = omBucketInfo.getObjectID();
+    final long volumeId = omMetadataManager.getVolumeId(volumeName);
+    final long bucketId = omMetadataManager.getBucketId(volumeName,
+            bucketName);
     int objID = 100;
 
     //1. Create root
     OmDirectoryInfo omDirInfo =
             OMRequestTestUtils.createOmDirectoryInfo(dirs.get(0), objID++,
-                    bucketID);
-    OMRequestTestUtils.addDirKeyToDirTable(true, omDirInfo, 5000,
+                    bucketId);
+    OMRequestTestUtils.addDirKeyToDirTable(true, omDirInfo,
+            volumeName, bucketName, 5000,
             omMetadataManager);
     //2. Create sub-directory under root
     omDirInfo = OMRequestTestUtils.createOmDirectoryInfo(dirs.get(1), objID++,
             omDirInfo.getObjectID());
-    OMRequestTestUtils.addDirKeyToDirTable(true, omDirInfo, 5000,
+    OMRequestTestUtils.addDirKeyToDirTable(true, omDirInfo,
+            volumeName, bucketName, 5000,
             omMetadataManager);
 
     OMRequest omRequest = createDirectoryRequest(volumeName, bucketName,
@@ -281,7 +281,7 @@ public class TestOMDirectoryCreateRequestWithFSO {
             == OzoneManagerProtocolProtos.Status.OK);
 
     // Key should exist in DB and cache.
-    verifyDirectoriesInDB(dirs, bucketID);
+    verifyDirectoriesInDB(dirs, volumeId, bucketId);
   }
 
   @Test
@@ -296,13 +296,12 @@ public class TestOMDirectoryCreateRequestWithFSO {
     OMRequestTestUtils.addVolumeAndBucketToDB(volumeName, bucketName,
             omMetadataManager, getBucketLayout());
 
-    String bucketKey = omMetadataManager.getBucketKey(volumeName, bucketName);
-    OmBucketInfo omBucketInfo =
-            omMetadataManager.getBucketTable().get(bucketKey);
-    long bucketID = omBucketInfo.getObjectID();
+    final long volumeId = omMetadataManager.getVolumeId(volumeName);
+    final long bucketId = omMetadataManager.getBucketId(volumeName,
+            bucketName);
 
     // bucketID is the parent
-    long parentID = bucketID;
+    long parentID = bucketId;
 
     // add all the directories into DirectoryTable
     for (int indx = 0; indx < dirs.size(); indx++) {
@@ -312,7 +311,7 @@ public class TestOMDirectoryCreateRequestWithFSO {
       OmDirectoryInfo omDirInfo = OMRequestTestUtils.createOmDirectoryInfo(
               dirs.get(indx), objID, parentID);
       OMRequestTestUtils.addDirKeyToDirTable(false, omDirInfo,
-              txnID, omMetadataManager);
+              volumeName, bucketName, txnID, omMetadataManager);
 
       parentID = omDirInfo.getObjectID();
     }
@@ -339,8 +338,8 @@ public class TestOMDirectoryCreateRequestWithFSO {
             0, ozoneManager.getMetrics().getNumKeys());
 
     // Key should exist in DB and doesn't added to cache.
-    verifyDirectoriesInDB(dirs, bucketID);
-    verifyDirectoriesNotInCache(dirs, bucketID);
+    verifyDirectoriesInDB(dirs, volumeId, bucketId);
+    verifyDirectoriesNotInCache(dirs, volumeId, bucketId);
   }
 
   /**
@@ -372,7 +371,7 @@ public class TestOMDirectoryCreateRequestWithFSO {
       OmDirectoryInfo omDirInfo = OMRequestTestUtils.createOmDirectoryInfo(
               dirs.get(indx), objID, parentID);
       OMRequestTestUtils.addDirKeyToDirTable(false, omDirInfo,
-              txnID, omMetadataManager);
+              volumeName, bucketName, txnID, omMetadataManager);
 
       parentID = omDirInfo.getObjectID();
     }
@@ -384,7 +383,11 @@ public class TestOMDirectoryCreateRequestWithFSO {
     OmKeyInfo omKeyInfo = OMRequestTestUtils.createOmKeyInfo(volumeName,
             bucketName, keyName, HddsProtos.ReplicationType.RATIS,
             HddsProtos.ReplicationFactor.THREE, objID++);
-    String ozoneFileName = parentID + "/" + dirs.get(dirs.size() - 1);
+    final long volumeId = omMetadataManager.getVolumeId(volumeName);
+    final long bucketId = omBucketInfo.getObjectID();
+
+    final String ozoneFileName = omMetadataManager.getOzonePathKey(
+            volumeId, bucketId, parentID, dirs.get(dirs.size() - 1));
     ++txnID;
     omMetadataManager.getKeyTable(getBucketLayout())
         .addCacheEntry(new CacheKey<>(ozoneFileName),
@@ -451,7 +454,7 @@ public class TestOMDirectoryCreateRequestWithFSO {
     OmDirectoryInfo omDirInfo = OMRequestTestUtils.createOmDirectoryInfo(
             dirs.get(0), objID++, parentID);
     OMRequestTestUtils.addDirKeyToDirTable(true, omDirInfo,
-            txnID, omMetadataManager);
+            volumeName, bucketName, txnID, omMetadataManager);
     parentID = omDirInfo.getObjectID();
 
     // Add a key in second level.
@@ -459,7 +462,11 @@ public class TestOMDirectoryCreateRequestWithFSO {
             bucketName, keyName, HddsProtos.ReplicationType.RATIS,
             HddsProtos.ReplicationFactor.THREE, objID);
 
-    String ozoneKey = parentID + "/" + dirs.get(1);
+    final long volumeId = omMetadataManager.getVolumeId(volumeName);
+    final long bucketId = omBucketInfo.getObjectID();
+
+    final String ozoneKey = omMetadataManager.getOzonePathKey(
+            volumeId, bucketId, parentID, dirs.get(1));
     ++txnID;
     omMetadataManager.getKeyTable(getBucketLayout())
         .addCacheEntry(new CacheKey<>(ozoneKey),
@@ -508,10 +515,9 @@ public class TestOMDirectoryCreateRequestWithFSO {
     // Add volume and bucket entries to DB.
     OMRequestTestUtils.addVolumeAndBucketToDB(volumeName, bucketName,
             omMetadataManager, getBucketLayout());
-    String bucketKey = omMetadataManager.getBucketKey(volumeName, bucketName);
-    OmBucketInfo omBucketInfo =
-            omMetadataManager.getBucketTable().get(bucketKey);
-    long bucketID = omBucketInfo.getObjectID();
+    final long volumeId = omMetadataManager.getVolumeId(volumeName);
+    final long bucketId = omMetadataManager.getBucketId(volumeName,
+            bucketName);
 
     OMRequest omRequest = createDirectoryRequest(volumeName, bucketName,
             OzoneFSUtils.addTrailingSlashIfNeeded(keyName));
@@ -532,7 +538,7 @@ public class TestOMDirectoryCreateRequestWithFSO {
     Assert.assertEquals(OzoneManagerProtocolProtos.Status.OK,
             omClientResponse.getOMResponse().getStatus());
 
-    verifyDirectoriesInDB(dirs, bucketID);
+    verifyDirectoriesInDB(dirs, volumeId, bucketId);
 
     Assert.assertEquals(dirs.size(), omMetrics.getNumKeys());
   }
@@ -583,10 +589,9 @@ public class TestOMDirectoryCreateRequestWithFSO {
     // Add volume and bucket entries to DB.
     OMRequestTestUtils.addVolumeAndBucketToDB(volumeName, bucketName,
             omMetadataManager, getBucketLayout());
-    String bucketKey = omMetadataManager.getBucketKey(volumeName, bucketName);
-    OmBucketInfo omBucketInfo =
-            omMetadataManager.getBucketTable().get(bucketKey);
-    long bucketID = omBucketInfo.getObjectID();
+    final long volumeId = omMetadataManager.getVolumeId(volumeName);
+    final long bucketId = omMetadataManager.getBucketId(volumeName,
+            bucketName);
 
     OMRequest omRequest = createDirectoryRequest(volumeName, bucketName,
             OzoneFSUtils.addTrailingSlashIfNeeded(keyName));
@@ -607,7 +612,7 @@ public class TestOMDirectoryCreateRequestWithFSO {
     Assert.assertEquals(OzoneManagerProtocolProtos.Status.OK,
             omClientResponse.getOMResponse().getStatus());
 
-    verifyDirectoriesInDB(dirs, bucketID);
+    verifyDirectoriesInDB(dirs, volumeId, bucketId);
 
     Assert.assertEquals(dirs.size(), omMetrics.getNumKeys());
   }
@@ -627,15 +632,17 @@ public class TestOMDirectoryCreateRequestWithFSO {
     return buf.toString();
   }
 
-  private void verifyDirectoriesInDB(List<String> dirs, long bucketID)
+  private void verifyDirectoriesInDB(List<String> dirs,
+                                     long volumeId, long bucketId)
           throws IOException {
     // bucketID is the parent
-    long parentID = bucketID;
+    long parentID = bucketId;
     for (int indx = 0; indx < dirs.size(); indx++) {
       String dirName = dirs.get(indx);
       String dbKey = "";
       // for index=0, parentID is bucketID
-      dbKey = omMetadataManager.getOzonePathKey(parentID, dirName);
+      dbKey = omMetadataManager.getOzonePathKey(volumeId, bucketId,
+              parentID, dirName);
       OmDirectoryInfo omDirInfo =
               omMetadataManager.getDirectoryTable().get(dbKey);
       Assert.assertNotNull("Invalid directory!", omDirInfo);
@@ -646,15 +653,17 @@ public class TestOMDirectoryCreateRequestWithFSO {
     }
   }
 
-  private void verifyDirectoriesNotInCache(List<String> dirs, long bucketID)
+  private void verifyDirectoriesNotInCache(List<String> dirs,
+                                           long volumeId, long bucketId)
           throws IOException {
     // bucketID is the parent
-    long parentID = bucketID;
+    long parentID = bucketId;
     for (int indx = 0; indx < dirs.size(); indx++) {
       String dirName = dirs.get(indx);
       String dbKey = "";
       // for index=0, parentID is bucketID
-      dbKey = omMetadataManager.getOzonePathKey(parentID, dirName);
+      dbKey = omMetadataManager.getOzonePathKey(volumeId, bucketId,
+              parentID, dirName);
       CacheValue<OmDirectoryInfo> omDirInfoCacheValue =
               omMetadataManager.getDirectoryTable()
                       .getCacheValue(new CacheKey<>(dbKey));
diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/file/TestOMFileCreateRequestWithFSO.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/file/TestOMFileCreateRequestWithFSO.java
index 0c4d10fb12..89d34f520f 100644
--- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/file/TestOMFileCreateRequestWithFSO.java
+++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/file/TestOMFileCreateRequestWithFSO.java
@@ -56,7 +56,10 @@ public class TestOMFileCreateRequestWithFSO extends TestOMFileCreateRequest {
     // Delete child key but retain path "a/b/ in the key table
     OmDirectoryInfo dirPathC = getDirInfo("a/b/c");
     Assert.assertNotNull("Failed to find dir path: a/b/c", dirPathC);
-    String dbFileD = omMetadataManager.getOzonePathKey(
+    final long volumeId = omMetadataManager.getVolumeId(volumeName);
+    final long bucketId = omMetadataManager.getBucketId(volumeName,
+            bucketName);
+    String dbFileD = omMetadataManager.getOzonePathKey(volumeId, bucketId,
             dirPathC.getObjectID(), fileNameD);
     omMetadataManager.getKeyTable(getBucketLayout()).delete(dbFileD);
     omMetadataManager.getKeyTable(getBucketLayout()).delete(dirPathC.getPath());
@@ -126,8 +129,9 @@ public class TestOMFileCreateRequestWithFSO extends TestOMFileCreateRequest {
   protected OmKeyInfo verifyPathInOpenKeyTable(String key, long id,
                                              boolean doAssert)
           throws Exception {
-    long bucketId = OMRequestTestUtils.getBucketId(volumeName, bucketName,
-            omMetadataManager);
+    final long volumeId = omMetadataManager.getVolumeId(volumeName);
+    final long bucketId = omMetadataManager.getBucketId(volumeName,
+            bucketName);
     String[] pathComponents = StringUtils.split(key, '/');
     long parentId = bucketId;
     for (int indx = 0; indx < pathComponents.length; indx++) {
@@ -135,7 +139,7 @@ public class TestOMFileCreateRequestWithFSO extends TestOMFileCreateRequest {
       // Reached last component, which is file name
       if (indx == pathComponents.length - 1) {
         String dbOpenFileName = omMetadataManager.getOpenFileName(
-                parentId, pathElement, id);
+                volumeId, bucketId, parentId, pathElement, id);
         OmKeyInfo omKeyInfo =
             omMetadataManager.getOpenKeyTable(getBucketLayout())
                 .get(dbOpenFileName);
@@ -145,8 +149,8 @@ public class TestOMFileCreateRequestWithFSO extends TestOMFileCreateRequest {
         return omKeyInfo;
       } else {
         // directory
-        String dbKey = omMetadataManager.getOzonePathKey(parentId,
-                pathElement);
+        String dbKey = omMetadataManager.getOzonePathKey(volumeId,
+                bucketId, parentId, pathElement);
         OmDirectoryInfo dirInfo =
                 omMetadataManager.getDirectoryTable().get(dbKey);
         parentId = dirInfo.getObjectID();
@@ -160,8 +164,9 @@ public class TestOMFileCreateRequestWithFSO extends TestOMFileCreateRequest {
 
   private OmDirectoryInfo getDirInfo(String key)
           throws Exception {
-    long bucketId = OMRequestTestUtils.getBucketId(volumeName, bucketName,
-            omMetadataManager);
+    final long volumeId = omMetadataManager.getVolumeId(volumeName);
+    final long bucketId = omMetadataManager.getBucketId(volumeName,
+            bucketName);
     String[] pathComponents = StringUtils.split(key, '/');
     long parentId = bucketId;
     OmDirectoryInfo dirInfo = null;
@@ -169,8 +174,8 @@ public class TestOMFileCreateRequestWithFSO extends TestOMFileCreateRequest {
       String pathElement = pathComponents[indx];
       // Reached last component, which is file name
       // directory
-      String dbKey = omMetadataManager.getOzonePathKey(parentId,
-              pathElement);
+      String dbKey = omMetadataManager.getOzonePathKey(volumeId,
+              bucketId, parentId, pathElement);
       dirInfo =
               omMetadataManager.getDirectoryTable().get(dbKey);
       parentId = dirInfo.getObjectID();
diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMAllocateBlockRequestWithFSO.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMAllocateBlockRequestWithFSO.java
index 6dec84d79c..c318fbccb5 100644
--- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMAllocateBlockRequestWithFSO.java
+++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMAllocateBlockRequestWithFSO.java
@@ -73,7 +73,11 @@ public class TestOMAllocateBlockRequestWithFSO
     OMRequestTestUtils.addFileToKeyTable(true, false,
             fileName, omKeyInfoFSO, clientID, txnLogId, omMetadataManager);
 
-    return omMetadataManager.getOzonePathKey(parentID, fileName);
+    final long volumeId = omMetadataManager.getVolumeId(volumeName);
+    final long bucketId = omMetadataManager.getBucketId(volumeName,
+            bucketName);
+    return omMetadataManager.getOzonePathKey(volumeId, bucketId,
+            parentID, fileName);
   }
 
   @NotNull
@@ -92,8 +96,8 @@ public class TestOMAllocateBlockRequestWithFSO
   @Override
   protected OmKeyInfo verifyPathInOpenKeyTable(String key, long id,
       boolean doAssert) throws Exception {
-    long bucketId = OMRequestTestUtils.getBucketId(volumeName, bucketName,
-            omMetadataManager);
+    final long volumeId = omMetadataManager.getVolumeId(volumeName);
+    final long bucketId = omMetadataManager.getBucketId(volumeName, bucketName);
     String[] pathComponents = StringUtils.split(key, '/');
     long parentId = bucketId;
     for (int indx = 0; indx < pathComponents.length; indx++) {
@@ -101,7 +105,8 @@ public class TestOMAllocateBlockRequestWithFSO
       // Reached last component, which is file name
       if (indx == pathComponents.length - 1) {
         String dbOpenFileName =
-            omMetadataManager.getOpenFileName(parentId, pathElement, id);
+            omMetadataManager.getOpenFileName(volumeId, bucketId,
+                    parentId, pathElement, id);
         OmKeyInfo omKeyInfo =
             omMetadataManager.getOpenKeyTable(getBucketLayout())
                 .get(dbOpenFileName);
@@ -111,7 +116,8 @@ public class TestOMAllocateBlockRequestWithFSO
         return omKeyInfo;
       } else {
         // directory
-        String dbKey = omMetadataManager.getOzonePathKey(parentId, pathElement);
+        String dbKey = omMetadataManager.getOzonePathKey(volumeId, bucketId,
+                parentId, pathElement);
         OmDirectoryInfo dirInfo =
             omMetadataManager.getDirectoryTable().get(dbKey);
         parentId = dirInfo.getObjectID();
diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyAclRequestWithFSO.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyAclRequestWithFSO.java
index d528926f7a..48d92e608b 100644
--- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyAclRequestWithFSO.java
+++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyAclRequestWithFSO.java
@@ -51,7 +51,13 @@ public class TestOMKeyAclRequestWithFSO extends TestOMKeyAclRequest {
     OMRequestTestUtils
         .addFileToKeyTable(false, false, fileName, omKeyInfo, -1, 50,
             omMetadataManager);
-    return omKeyInfo.getPath();
+    final long volumeId = omMetadataManager.getVolumeId(
+            omKeyInfo.getVolumeName());
+    final long bucketId = omMetadataManager.getBucketId(
+            omKeyInfo.getVolumeName(), omKeyInfo.getBucketName());
+    return omMetadataManager.getOzonePathKey(
+            volumeId, bucketId, omKeyInfo.getParentObjectID(),
+            fileName);
   }
 
   @Override
diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyCommitRequest.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyCommitRequest.java
index d4122c0d49..080f68a98b 100644
--- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyCommitRequest.java
+++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyCommitRequest.java
@@ -206,7 +206,11 @@ public class TestOMKeyCommitRequest extends TestOMKeyRequest {
     OMKeyCommitRequest omKeyCommitRequest =
             getOmKeyCommitRequest(modifiedOmRequest);
 
-    String ozoneKey = getOzonePathKey();
+    final long volumeId = 100L;
+    final long bucketID = 1000L;
+    final String fileName = OzoneFSUtils.getFileName(keyName);
+    final String ozoneKey = omMetadataManager.getOzonePathKey(volumeId,
+            bucketID, bucketID, fileName);
 
     // Key should not be there in key table, as validateAndUpdateCache is
     // still not called.
diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyCommitRequestWithFSO.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyCommitRequestWithFSO.java
index f64250a9b4..bd5eb65c90 100644
--- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyCommitRequestWithFSO.java
+++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyCommitRequestWithFSO.java
@@ -41,6 +41,9 @@ public class TestOMKeyCommitRequestWithFSO extends TestOMKeyCommitRequest {
 
   private long parentID = Long.MIN_VALUE;
 
+  private long getVolumeID() throws IOException {
+    return omMetadataManager.getVolumeId(volumeName);
+  }
   private long getBucketID() throws java.io.IOException {
     String bucketKey = omMetadataManager.getBucketKey(volumeName, bucketName);
     OmBucketInfo omBucketInfo =
@@ -54,9 +57,11 @@ public class TestOMKeyCommitRequestWithFSO extends TestOMKeyCommitRequest {
 
   @Override
   protected String getOzonePathKey() throws IOException {
-    long bucketID = getBucketID();
+    final long volumeId = getVolumeID();
+    final long bucketID = getBucketID();
     String fileName = OzoneFSUtils.getFileName(keyName);
-    return omMetadataManager.getOzonePathKey(bucketID, fileName);
+    return omMetadataManager.getOzonePathKey(volumeId, bucketID,
+            bucketID, fileName);
   }
 
   @Override
@@ -82,7 +87,8 @@ public class TestOMKeyCommitRequestWithFSO extends TestOMKeyCommitRequest {
     OMRequestTestUtils.addFileToKeyTable(true, false,
             fileName, omKeyInfoFSO, clientID, txnLogId, omMetadataManager);
 
-    return omMetadataManager.getOzonePathKey(parentID, fileName);
+    return omMetadataManager.getOzonePathKey(getVolumeID(), getBucketID(),
+            parentID, fileName);
   }
 
   @NotNull
diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyCreateRequestWithFSO.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyCreateRequestWithFSO.java
index 2ddc5c71f9..abc1a34d90 100644
--- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyCreateRequestWithFSO.java
+++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyCreateRequestWithFSO.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
 import org.apache.hadoop.ozone.om.helpers.OmDirectoryInfo;
 import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
+import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
 import org.apache.hadoop.ozone.om.helpers.OzoneFSUtils;
 import org.apache.hadoop.ozone.om.helpers.BucketLayout;
 import org.apache.hadoop.ozone.om.request.OMRequestTestUtils;
@@ -74,12 +75,16 @@ public class TestOMKeyCreateRequestWithFSO extends TestOMKeyCreateRequest {
     Path keyPath = Paths.get(keyName);
     long parentID = checkIntermediatePaths(keyPath);
 
+    final long volumeId = omMetadataManager.getVolumeId(volumeName);
+    final long bucketId = omMetadataManager.getBucketId(volumeName,
+            bucketName);
+
     // Check open key entry
     Path keyPathFileName = keyPath.getFileName();
     Assert.assertNotNull("Failed to find fileName", keyPathFileName);
     String fileName = keyPathFileName.toString();
-    String openKey = omMetadataManager.getOpenFileName(parentID, fileName,
-            omRequest.getCreateKeyRequest().getClientID());
+    String openKey = omMetadataManager.getOpenFileName(volumeId, bucketId,
+            parentID, fileName, omRequest.getCreateKeyRequest().getClientID());
     OmKeyInfo omKeyInfo =
         omMetadataManager.getOpenKeyTable(omKeyCreateRequest.getBucketLayout())
             .get(openKey);
@@ -95,6 +100,7 @@ public class TestOMKeyCreateRequestWithFSO extends TestOMKeyCreateRequest {
             omMetadataManager.getBucketTable().get(bucketKey);
     Assert.assertNotNull("Bucket not found!", omBucketInfo);
     long lastKnownParentId = omBucketInfo.getObjectID();
+    final long volumeId = omMetadataManager.getVolumeId(volumeName);
 
     Iterator<Path> elements = keyPath.iterator();
     StringBuilder fullKeyPath = new StringBuilder(bucketKey);
@@ -102,8 +108,8 @@ public class TestOMKeyCreateRequestWithFSO extends TestOMKeyCreateRequest {
       String fileName = elements.next().toString();
       fullKeyPath.append(OzoneConsts.OM_KEY_PREFIX);
       fullKeyPath.append(fileName);
-      String dbNodeName = omMetadataManager.getOzonePathKey(
-              lastKnownParentId, fileName);
+      String dbNodeName = omMetadataManager.getOzonePathKey(volumeId,
+              omBucketInfo.getObjectID(), lastKnownParentId, fileName);
       OmDirectoryInfo omDirInfo = omMetadataManager.getDirectoryTable().
               get(dbNodeName);
 
@@ -117,27 +123,33 @@ public class TestOMKeyCreateRequestWithFSO extends TestOMKeyCreateRequest {
 
   @Override
   protected String getOpenKey(long id) throws IOException {
-    String bucketKey = omMetadataManager.getBucketKey(volumeName, bucketName);
-    OmBucketInfo omBucketInfo =
-            omMetadataManager.getBucketTable().get(bucketKey);
-    if (omBucketInfo != null) {
-      return omMetadataManager.getOpenFileName(omBucketInfo.getObjectID(),
-              keyName, id);
-    } else {
-      return omMetadataManager.getOpenFileName(1000, keyName, id);
-    }
+
+    OmVolumeArgs volumeInfo = omMetadataManager.getVolumeTable()
+            .get(omMetadataManager.getVolumeKey(volumeName));
+    OmBucketInfo omBucketInfo = omMetadataManager.getBucketTable()
+            .get(omMetadataManager.getBucketKey(volumeName, bucketName));
+    return omMetadataManager.getOpenFileName(
+            volumeInfo == null ? 100 : volumeInfo.getObjectID(),
+            omBucketInfo == null ? 1000 : omBucketInfo.getObjectID(),
+            omBucketInfo == null ? 1000 : omBucketInfo.getObjectID(),
+            keyName, id);
+
   }
 
   @Override
   protected String getOzoneKey() throws IOException {
     String bucketKey = omMetadataManager.getBucketKey(volumeName, bucketName);
+    final long volumeId = omMetadataManager.getVolumeId(volumeName);
     OmBucketInfo omBucketInfo =
         omMetadataManager.getBucketTable().get(bucketKey);
     if (omBucketInfo != null) {
-      return omMetadataManager.getOzonePathKey(omBucketInfo.getObjectID(),
-          keyName);
+      final long bucketId = omMetadataManager.getBucketId(volumeName,
+              bucketName);
+      return omMetadataManager.getOzonePathKey(volumeId, bucketId,
+              omBucketInfo.getObjectID(), keyName);
     } else {
-      return omMetadataManager.getOzonePathKey(1000, keyName);
+      return omMetadataManager.getOzonePathKey(volumeId, 1000,
+              1000, keyName);
     }
   }
 
diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyDeleteRequestWithFSO.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyDeleteRequestWithFSO.java
index 4dce60bec3..5a38aa0ab1 100644
--- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyDeleteRequestWithFSO.java
+++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyDeleteRequestWithFSO.java
@@ -73,7 +73,13 @@ public class TestOMKeyDeleteRequestWithFSO extends TestOMKeyDeleteRequest {
     omKeyInfo.setKeyName(FILE_NAME);
     OMRequestTestUtils.addFileToKeyTable(false, false,
         FILE_NAME, omKeyInfo, -1, 50, omMetadataManager);
-    return omKeyInfo.getPath();
+    final long volumeId = omMetadataManager.getVolumeId(
+            omKeyInfo.getVolumeName());
+    final long bucketId = omMetadataManager.getBucketId(
+            omKeyInfo.getVolumeName(), omKeyInfo.getBucketName());
+    return omMetadataManager.getOzonePathKey(
+            volumeId, bucketId, omKeyInfo.getParentObjectID(),
+            omKeyInfo.getFileName());
   }
 
   protected String addKeyToDirTable(String volumeName, String bucketName,
diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMOpenKeysDeleteRequest.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMOpenKeysDeleteRequest.java
index 74abefc235..49daf51ec3 100644
--- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMOpenKeysDeleteRequest.java
+++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMOpenKeysDeleteRequest.java
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.ozone.om.request.key;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -90,6 +91,8 @@ public class TestOMOpenKeysDeleteRequest extends TestOMKeyRequest {
    */
   @Test
   public void testDeleteOpenKeysNotInTable() throws Exception {
+    OMRequestTestUtils.addVolumeAndBucketToDB(volumeName, bucketName,
+            omMetadataManager, getBucketLayout());
     List<Pair<Long, OmKeyInfo>> openKeys =
         makeOpenKeys(volumeName, bucketName, 5);
     deleteOpenKeysFromCache(openKeys);
@@ -111,6 +114,13 @@ public class TestOMOpenKeysDeleteRequest extends TestOMKeyRequest {
     final String bucket1 = UUID.randomUUID().toString();
     final String bucket2 = UUID.randomUUID().toString();
 
+    OMRequestTestUtils.addVolumeAndBucketToDB(volume1, bucket1,
+            omMetadataManager, getBucketLayout());
+    OMRequestTestUtils.addVolumeAndBucketToDB(volume1, bucket2,
+            omMetadataManager, getBucketLayout());
+    OMRequestTestUtils.addVolumeAndBucketToDB(volume2, bucket2,
+            omMetadataManager, getBucketLayout());
+
     List<Pair<Long, OmKeyInfo>> v1b1KeysToDelete =
         makeOpenKeys(volume1, bucket1, 3);
     List<Pair<Long, OmKeyInfo>> v1b1KeysToKeep =
@@ -165,6 +175,9 @@ public class TestOMOpenKeysDeleteRequest extends TestOMKeyRequest {
     final String bucket = UUID.randomUUID().toString();
     final String key = UUID.randomUUID().toString();
 
+    OMRequestTestUtils.addVolumeAndBucketToDB(volume, bucket,
+            omMetadataManager, getBucketLayout());
+
     List<Pair<Long, OmKeyInfo>> keysToKeep =
         makeOpenKeys(volume, bucket, key, 3);
     List<Pair<Long, OmKeyInfo>> keysToDelete =
@@ -192,6 +205,9 @@ public class TestOMOpenKeysDeleteRequest extends TestOMKeyRequest {
     final int numExistentKeys = 3;
     final int numNonExistentKeys = 5;
 
+    OMRequestTestUtils.addVolumeAndBucketToDB(volume, bucket,
+            omMetadataManager, getBucketLayout());
+
     OMMetrics metrics = ozoneManager.getMetrics();
     Assert.assertEquals(metrics.getNumOpenKeyDeleteRequests(), 0);
     Assert.assertEquals(metrics.getNumOpenKeyDeleteRequestFails(), 0);
@@ -354,22 +370,29 @@ public class TestOMOpenKeysDeleteRequest extends TestOMKeyRequest {
     }
   }
 
-  private List<String> getDBKeyNames(List<Pair<Long, OmKeyInfo>> openKeys) {
-    return openKeys.stream()
-        .map(getBucketLayout().isFileSystemOptimized() ?
-            p -> omMetadataManager.getOpenFileName(
-                p.getRight().getParentObjectID(),
-                p.getRight().getFileName(),
-                p.getLeft()
-            ) :
-            p -> omMetadataManager.getOpenKey(
-                p.getRight().getVolumeName(),
-                p.getRight().getBucketName(),
-                p.getRight().getKeyName(),
-                p.getLeft()
-            )
-        )
-        .collect(Collectors.toList());
+  private List<String> getDBKeyNames(List<Pair<Long, OmKeyInfo>> openKeys)
+          throws IOException {
+
+    final List<String> result = new ArrayList<>();
+    for (Pair<Long, OmKeyInfo> entry : openKeys) {
+      final OmKeyInfo ki = entry.getRight();
+      if (getBucketLayout().isFileSystemOptimized()) {
+        result.add(omMetadataManager.getOpenFileName(
+                omMetadataManager.getVolumeId(ki.getVolumeName()),
+                omMetadataManager.getBucketId(ki.getVolumeName(),
+                        ki.getBucketName()),
+                ki.getParentObjectID(),
+                ki.getFileName(),
+                entry.getLeft()));
+      } else {
+        result.add(omMetadataManager.getOpenKey(
+                entry.getRight().getVolumeName(),
+                entry.getRight().getBucketName(),
+                entry.getRight().getKeyName(),
+                entry.getLeft()));
+      }
+    }
+    return result;
   }
 
   /**
@@ -398,7 +421,7 @@ public class TestOMOpenKeysDeleteRequest extends TestOMKeyRequest {
    * {@code OpenKeyDeleteRequest}.
    */
   private OMRequest createDeleteOpenKeyRequest(
-      List<Pair<Long, OmKeyInfo>> keysToDelete) {
+      List<Pair<Long, OmKeyInfo>> keysToDelete) throws IOException {
 
     List<String> names = getDBKeyNames(keysToDelete);
 
diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/s3/multipart/TestS3InitiateMultipartUploadRequestWithFSO.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/s3/multipart/TestS3InitiateMultipartUploadRequestWithFSO.java
index 2256593da3..6fb26486ad 100644
--- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/s3/multipart/TestS3InitiateMultipartUploadRequestWithFSO.java
+++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/s3/multipart/TestS3InitiateMultipartUploadRequestWithFSO.java
@@ -19,7 +19,6 @@
 
 package org.apache.hadoop.ozone.om.request.s3.multipart;
 
-import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
 import org.apache.hadoop.ozone.om.helpers.OmDirectoryInfo;
 import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
 import org.apache.hadoop.ozone.om.helpers.OmMultipartKeyInfo;
@@ -58,11 +57,9 @@ public class TestS3InitiateMultipartUploadRequestWithFSO
     OMRequestTestUtils.addVolumeAndBucketToDB(volumeName, bucketName,
         omMetadataManager, getBucketLayout());
 
-    String bucketKey = omMetadataManager.getBucketKey(volumeName, bucketName);
-    OmBucketInfo omBucketInfo =
-            omMetadataManager.getBucketTable().get(bucketKey);
-    long bucketID = omBucketInfo.getObjectID();
-
+    final long volumeId = omMetadataManager.getVolumeId(volumeName);
+    final long bucketId = omMetadataManager.getBucketId(volumeName,
+            bucketName);
     OMRequest modifiedRequest = doPreExecuteInitiateMPUWithFSO(volumeName,
         bucketName, keyName);
 
@@ -77,15 +74,16 @@ public class TestS3InitiateMultipartUploadRequestWithFSO
     Assert.assertEquals(OzoneManagerProtocolProtos.Status.OK,
             omClientResponse.getOMResponse().getStatus());
 
-    long parentID = verifyDirectoriesInDB(dirs, bucketID);
+    long parentID = verifyDirectoriesInDB(dirs, volumeId, bucketId);
 
     String multipartFileKey = omMetadataManager
         .getMultipartKey(volumeName, bucketName, keyName,
             modifiedRequest.getInitiateMultiPartUploadRequest().getKeyArgs()
                 .getMultipartUploadID());
 
-    String multipartOpenFileKey = omMetadataManager.getMultipartKey(parentID,
-            fileName, modifiedRequest.getInitiateMultiPartUploadRequest()
+    String multipartOpenFileKey = omMetadataManager.getMultipartKey(volumeId,
+            bucketId, parentID, fileName,
+            modifiedRequest.getInitiateMultiPartUploadRequest()
                     .getKeyArgs().getMultipartUploadID());
 
     OmKeyInfo omKeyInfo = omMetadataManager
@@ -119,15 +117,17 @@ public class TestS3InitiateMultipartUploadRequestWithFSO
             .getCreationTime());
   }
 
-  private long verifyDirectoriesInDB(List<String> dirs, long bucketID)
+  private long verifyDirectoriesInDB(List<String> dirs, final long volumeId,
+                                     final long bucketId)
       throws IOException {
     // bucketID is the parent
-    long parentID = bucketID;
+    long parentID = bucketId;
     for (int indx = 0; indx < dirs.size(); indx++) {
       String dirName = dirs.get(indx);
       String dbKey = "";
       // for index=0, parentID is bucketID
-      dbKey = omMetadataManager.getOzonePathKey(parentID, dirName);
+      dbKey = omMetadataManager.getOzonePathKey(volumeId, bucketId,
+              parentID, dirName);
       OmDirectoryInfo omDirInfo =
               omMetadataManager.getDirectoryTable().get(dbKey);
       Assert.assertNotNull("Invalid directory!", omDirInfo);
diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/s3/multipart/TestS3MultipartUploadAbortRequest.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/s3/multipart/TestS3MultipartUploadAbortRequest.java
index 5c8c121152..f7490d6204 100644
--- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/s3/multipart/TestS3MultipartUploadAbortRequest.java
+++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/s3/multipart/TestS3MultipartUploadAbortRequest.java
@@ -194,7 +194,7 @@ public class TestS3MultipartUploadAbortRequest extends TestS3MultipartRequest {
   }
 
   protected String getMultipartOpenKey(String volumeName, String bucketName,
-      String keyName, String multipartUploadID) {
+      String keyName, String multipartUploadID) throws IOException {
     return omMetadataManager.getMultipartKey(volumeName,
         bucketName, keyName, multipartUploadID);
   }
diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/s3/multipart/TestS3MultipartUploadAbortRequestWithFSO.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/s3/multipart/TestS3MultipartUploadAbortRequestWithFSO.java
index 21b96405d9..3c710988a5 100644
--- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/s3/multipart/TestS3MultipartUploadAbortRequestWithFSO.java
+++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/s3/multipart/TestS3MultipartUploadAbortRequestWithFSO.java
@@ -23,6 +23,7 @@ import org.apache.hadoop.ozone.om.helpers.BucketLayout;
 import org.apache.hadoop.ozone.om.request.OMRequestTestUtils;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
 
+import java.io.IOException;
 import java.util.UUID;
 
 /**
@@ -64,10 +65,13 @@ public class TestS3MultipartUploadAbortRequestWithFSO
 
   @Override
   protected String getMultipartOpenKey(String volumeName, String bucketName,
-      String keyName, String multipartUploadID) {
+      String keyName, String multipartUploadID) throws IOException {
     String fileName = StringUtils.substringAfter(keyName, dirName);
-    return omMetadataManager.getMultipartKey(parentID, fileName,
-        multipartUploadID);
+    final long volumeId = omMetadataManager.getVolumeId(volumeName);
+    final long bucketId = omMetadataManager.getBucketId(volumeName,
+            bucketName);
+    return omMetadataManager.getMultipartKey(volumeId, bucketId,
+            parentID, fileName, multipartUploadID);
   }
 
   @Override
diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/s3/multipart/TestS3MultipartUploadCommitPartRequest.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/s3/multipart/TestS3MultipartUploadCommitPartRequest.java
index 9e205896bc..f550b2f791 100644
--- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/s3/multipart/TestS3MultipartUploadCommitPartRequest.java
+++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/s3/multipart/TestS3MultipartUploadCommitPartRequest.java
@@ -30,6 +30,7 @@ import org.apache.hadoop.util.Time;
 import org.junit.Assert;
 import org.junit.Test;
 
+import java.io.IOException;
 import java.util.UUID;
 
 /**
@@ -230,13 +231,13 @@ public class TestS3MultipartUploadCommitPartRequest
   }
 
   protected String getMultipartOpenKey(String volumeName, String bucketName,
-      String keyName, String multipartUploadID) {
+      String keyName, String multipartUploadID) throws IOException {
     return omMetadataManager
         .getMultipartKey(volumeName, bucketName, keyName, multipartUploadID);
   }
 
   protected String getOpenKey(String volumeName, String bucketName,
-      String keyName, long clientID) {
+      String keyName, long clientID) throws IOException {
     return omMetadataManager.getOpenKey(volumeName, bucketName,
         keyName, clientID);
   }
diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/s3/multipart/TestS3MultipartUploadCommitPartRequestWithFSO.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/s3/multipart/TestS3MultipartUploadCommitPartRequestWithFSO.java
index 7de016ad1d..e0e2227416 100644
--- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/s3/multipart/TestS3MultipartUploadCommitPartRequestWithFSO.java
+++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/s3/multipart/TestS3MultipartUploadCommitPartRequestWithFSO.java
@@ -29,6 +29,7 @@ import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMReque
 import org.apache.hadoop.util.Time;
 import org.junit.Assert;
 
+import java.io.IOException;
 import java.util.UUID;
 
 /**
@@ -75,17 +76,24 @@ public class TestS3MultipartUploadCommitPartRequestWithFSO
 
   @Override
   protected String getMultipartOpenKey(String volumeName, String bucketName,
-      String keyName, String multipartUploadID) {
+      String keyName, String multipartUploadID) throws IOException {
     String fileName = StringUtils.substringAfter(keyName, dirName);
-    return omMetadataManager.getMultipartKey(parentID, fileName,
-            multipartUploadID);
+    final long volumeId = omMetadataManager.getVolumeId(volumeName);
+    final long bucketId = omMetadataManager.getBucketId(volumeName,
+            bucketName);
+    return omMetadataManager.getMultipartKey(volumeId, bucketId,
+            parentID, fileName, multipartUploadID);
   }
 
   @Override
   protected String getOpenKey(String volumeName, String bucketName,
-      String keyName, long clientID) {
+      String keyName, long clientID) throws IOException {
     String fileName = StringUtils.substringAfter(keyName, dirName);
-    return omMetadataManager.getOpenFileName(parentID, fileName, clientID);
+    final long volumeId = omMetadataManager.getVolumeId(volumeName);
+    final long bucketId = omMetadataManager.getBucketId(volumeName,
+            bucketName);
+    return omMetadataManager.getOpenFileName(volumeId, bucketId,
+            parentID, fileName, clientID);
   }
 
   @Override
diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/s3/multipart/TestS3MultipartUploadCompleteRequestWithFSO.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/s3/multipart/TestS3MultipartUploadCompleteRequestWithFSO.java
index 0824a324d5..95bd068698 100644
--- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/s3/multipart/TestS3MultipartUploadCompleteRequestWithFSO.java
+++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/s3/multipart/TestS3MultipartUploadCompleteRequestWithFSO.java
@@ -20,7 +20,6 @@ package org.apache.hadoop.ozone.om.request.s3.multipart;
 
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.ozone.OzoneConsts;
-import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
 import org.apache.hadoop.ozone.om.helpers.BucketLayout;
 import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
 import org.apache.hadoop.ozone.om.helpers.OzoneFSUtils;
@@ -92,21 +91,22 @@ public class TestS3MultipartUploadCompleteRequestWithFSO
             bucketName, keyName, 0);
 
     Assert.assertNotNull("key not found in DB!", keyStatus);
-
-    return omMetadataManager.getMultipartKey(keyStatus.getKeyInfo()
-                    .getParentObjectID(), keyStatus.getTrimmedName(),
-            multipartUploadID);
+    final long volumeId = omMetadataManager.getVolumeId(volumeName);
+    final long bucketId = omMetadataManager.getBucketId(volumeName,
+            bucketName);
+    return omMetadataManager.getMultipartKey(volumeId, bucketId,
+            keyStatus.getKeyInfo().getParentObjectID(),
+            keyStatus.getTrimmedName(), multipartUploadID);
   }
 
   private long getParentID(String volumeName, String bucketName,
                            String keyName) throws IOException {
     Path keyPath = Paths.get(keyName);
     Iterator<Path> elements = keyPath.iterator();
-    String bucketKey = omMetadataManager.getBucketKey(volumeName, bucketName);
-    OmBucketInfo omBucketInfo =
-            omMetadataManager.getBucketTable().get(bucketKey);
-
-    return OMFileRequest.getParentID(omBucketInfo.getObjectID(),
+    final long volumeId = omMetadataManager.getVolumeId(volumeName);
+    final long bucketId = omMetadataManager.getBucketId(volumeName,
+            bucketName);
+    return OMFileRequest.getParentID(volumeId, bucketId,
             elements, keyName, omMetadataManager);
   }
 
@@ -115,7 +115,11 @@ public class TestS3MultipartUploadCompleteRequestWithFSO
                                  String keyName) throws IOException {
     long parentID = getParentID(volumeName, bucketName, keyName);
     String fileName = OzoneFSUtils.getFileName(keyName);
-    return omMetadataManager.getOzonePathKey(parentID, fileName);
+    final long volumeId = omMetadataManager.getVolumeId(volumeName);
+    final long bucketId = omMetadataManager.getBucketId(volumeName,
+            bucketName);
+    return omMetadataManager.getOzonePathKey(volumeId, bucketId,
+            parentID, fileName);
   }
 
   @Override
diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/file/TestOMDirectoryCreateResponseWithFSO.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/file/TestOMDirectoryCreateResponseWithFSO.java
index 35064ba17a..0a8471ce98 100644
--- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/file/TestOMDirectoryCreateResponseWithFSO.java
+++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/file/TestOMDirectoryCreateResponseWithFSO.java
@@ -18,13 +18,19 @@
 
 package org.apache.hadoop.ozone.om.response.file;
 
+import com.google.common.base.Optional;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.StorageType;
 import org.apache.hadoop.hdds.utils.db.BatchOperation;
+import org.apache.hadoop.hdds.utils.db.cache.CacheKey;
+import org.apache.hadoop.hdds.utils.db.cache.CacheValue;
 import org.apache.hadoop.ozone.om.OMConfigKeys;
 import org.apache.hadoop.ozone.om.OMMetadataManager;
 import org.apache.hadoop.ozone.om.OmMetadataManagerImpl;
 import org.apache.hadoop.ozone.om.helpers.BucketLayout;
+import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
 import org.apache.hadoop.ozone.om.helpers.OmDirectoryInfo;
+import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
 import org.apache.hadoop.ozone.om.request.OMRequestTestUtils;
 import org.apache.hadoop.ozone.om.request.file.OMDirectoryCreateRequestWithFSO;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
@@ -35,6 +41,7 @@ import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.UUID;
 
@@ -62,6 +69,12 @@ public class TestOMDirectoryCreateResponseWithFSO {
 
     String keyName = UUID.randomUUID().toString();
 
+    final String volume = "volume";
+    final String bucket = "bucket";
+    addVolumeToDB(volume);
+    addBucketToDB(volume, bucket);
+    final long volumeId = omMetadataManager.getVolumeId(volume);
+    final long bucketId = omMetadataManager.getBucketId(volume, bucket);
     long parentID = 100;
     OmDirectoryInfo omDirInfo =
             OMRequestTestUtils.createOmDirectoryInfo(keyName, 500, parentID);
@@ -73,9 +86,10 @@ public class TestOMDirectoryCreateResponseWithFSO {
             .build();
 
     OMDirectoryCreateResponseWithFSO omDirectoryCreateResponseWithFSO =
-        new OMDirectoryCreateResponseWithFSO(omResponse, omDirInfo,
-            new ArrayList<>(), OMDirectoryCreateRequestWithFSO.Result.SUCCESS,
-            BucketLayout.FILE_SYSTEM_OPTIMIZED);
+        new OMDirectoryCreateResponseWithFSO(omResponse, volume, bucket,
+                omDirInfo, new ArrayList<>(),
+                OMDirectoryCreateRequestWithFSO.Result.SUCCESS,
+                BucketLayout.FILE_SYSTEM_OPTIMIZED);
 
     omDirectoryCreateResponseWithFSO
         .addToDBBatch(omMetadataManager, batchOperation);
@@ -84,6 +98,35 @@ public class TestOMDirectoryCreateResponseWithFSO {
     omMetadataManager.getStore().commitBatchOperation(batchOperation);
 
     Assert.assertNotNull(omMetadataManager.getDirectoryTable().get(
-            omMetadataManager.getOzonePathKey(parentID, keyName)));
+            omMetadataManager.getOzonePathKey(volumeId, bucketId,
+                    parentID, keyName)));
+  }
+
+  private void addVolumeToDB(String volumeName) throws IOException {
+    final OmVolumeArgs volumeArgs = OmVolumeArgs.newBuilder()
+            .setVolume(volumeName)
+            .setAdminName("admin")
+            .setOwnerName("owner")
+            .setObjectID(System.currentTimeMillis())
+            .build();
+
+    omMetadataManager.getVolumeTable().addCacheEntry(
+            new CacheKey<>(omMetadataManager.getVolumeKey(volumeName)),
+            new CacheValue<>(Optional.of(volumeArgs), 1));
+  }
+  private void addBucketToDB(String volumeName, String bucketName)
+          throws IOException {
+    final OmBucketInfo omBucketInfo = OmBucketInfo.newBuilder()
+            .setVolumeName(volumeName)
+            .setBucketName(bucketName)
+            .setObjectID(System.currentTimeMillis())
+            .setStorageType(StorageType.DISK)
+            .setIsVersionEnabled(false)
+            .build();
+
+    omMetadataManager.getBucketTable().addCacheEntry(
+            new CacheKey<>(omMetadataManager.getBucketKey(
+                    volumeName, bucketName)),
+            new CacheValue<>(Optional.of(omBucketInfo), 1));
   }
 }
diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/file/TestOMFileCreateResponseWithFSO.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/file/TestOMFileCreateResponseWithFSO.java
index 397e10ad6e..1e251e5d9f 100644
--- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/file/TestOMFileCreateResponseWithFSO.java
+++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/file/TestOMFileCreateResponseWithFSO.java
@@ -29,6 +29,7 @@ import org.apache.hadoop.util.Time;
 import org.jetbrains.annotations.NotNull;
 import org.junit.Assert;
 
+import java.io.IOException;
 import java.util.ArrayList;
 
 /**
@@ -49,9 +50,12 @@ public class TestOMFileCreateResponseWithFSO extends TestOMKeyCreateResponse {
 
   @NotNull
   @Override
-  protected String getOpenKeyName() {
+  protected String getOpenKeyName() throws IOException {
     Assert.assertNotNull(omBucketInfo);
-    return omMetadataManager.getOpenFileName(
+    final long volumeId = omMetadataManager.getVolumeId(volumeName);
+    final long bucketId = omMetadataManager.getBucketId(volumeName,
+            bucketName);
+    return omMetadataManager.getOpenFileName(volumeId, bucketId,
             omBucketInfo.getObjectID(), keyName, clientID);
   }
 
diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/key/TestOMAllocateBlockResponseWithFSO.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/key/TestOMAllocateBlockResponseWithFSO.java
index ec137451ed..b87deaf420 100644
--- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/key/TestOMAllocateBlockResponseWithFSO.java
+++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/key/TestOMAllocateBlockResponseWithFSO.java
@@ -57,7 +57,10 @@ public class TestOMAllocateBlockResponseWithFSO
 
   @Override
   protected String getOpenKey() throws Exception {
-    return omMetadataManager.getOpenFileName(
+    final long volumeId = omMetadataManager.getVolumeId(volumeName);
+    final long bucketId = omMetadataManager.getBucketId(volumeName,
+            bucketName);
+    return omMetadataManager.getOpenFileName(volumeId, bucketId,
             parentID, fileName, clientID);
   }
 
diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/key/TestOMKeyCommitResponse.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/key/TestOMKeyCommitResponse.java
index de8a4bbc13..5782e0df49 100644
--- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/key/TestOMKeyCommitResponse.java
+++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/key/TestOMKeyCommitResponse.java
@@ -30,6 +30,8 @@ import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
 import org.apache.hadoop.ozone.om.request.OMRequestTestUtils;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
 
+import java.io.IOException;
+
 /**
  * Tests OMKeyCommitResponse.
  */
@@ -144,7 +146,7 @@ public class TestOMKeyCommitResponse extends TestOMKeyResponse {
   }
 
   @NotNull
-  protected String getOzoneKey() {
+  protected String getOzoneKey() throws IOException {
     Assert.assertNotNull(omBucketInfo);
     return omMetadataManager.getOzoneKey(volumeName,
             omBucketInfo.getBucketName(), keyName);
diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/key/TestOMKeyCommitResponseWithFSO.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/key/TestOMKeyCommitResponseWithFSO.java
index caed4a031e..d097950491 100644
--- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/key/TestOMKeyCommitResponseWithFSO.java
+++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/key/TestOMKeyCommitResponseWithFSO.java
@@ -29,6 +29,8 @@ import org.apache.hadoop.util.Time;
 import org.jetbrains.annotations.NotNull;
 import org.junit.Assert;
 
+import java.io.IOException;
+
 /**
  * Tests OMKeyCommitResponse - prefix layout.
  */
@@ -75,18 +77,24 @@ public class TestOMKeyCommitResponseWithFSO extends TestOMKeyCommitResponse {
 
   @NotNull
   @Override
-  protected String getOpenKeyName() {
+  protected String getOpenKeyName() throws IOException  {
     Assert.assertNotNull(omBucketInfo);
-    return omMetadataManager.getOpenFileName(
+    final long volumeId = omMetadataManager.getVolumeId(volumeName);
+    final long bucketId = omMetadataManager.getBucketId(volumeName,
+            bucketName);
+    return omMetadataManager.getOpenFileName(volumeId, bucketId,
             omBucketInfo.getObjectID(), keyName, clientID);
   }
 
   @NotNull
   @Override
-  protected String getOzoneKey() {
+  protected String getOzoneKey()  throws IOException {
     Assert.assertNotNull(omBucketInfo);
-    return omMetadataManager.getOzonePathKey(omBucketInfo.getObjectID(),
-            keyName);
+    final long volumeId = omMetadataManager.getVolumeId(volumeName);
+    final long bucketId = omMetadataManager.getBucketId(volumeName,
+            bucketName);
+    return omMetadataManager.getOzonePathKey(volumeId, bucketId,
+            omBucketInfo.getObjectID(), keyName);
   }
 
   @Override
diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/key/TestOMKeyCreateResponseWithFSO.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/key/TestOMKeyCreateResponseWithFSO.java
index a7ec3c1d47..fe0af3a337 100644
--- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/key/TestOMKeyCreateResponseWithFSO.java
+++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/key/TestOMKeyCreateResponseWithFSO.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.util.Time;
 import org.jetbrains.annotations.NotNull;
 import org.junit.Assert;
 
+import java.io.IOException;
 import java.util.ArrayList;
 
 /**
@@ -36,9 +37,12 @@ public class TestOMKeyCreateResponseWithFSO extends TestOMKeyCreateResponse {
 
   @NotNull
   @Override
-  protected String getOpenKeyName() {
+  protected String getOpenKeyName() throws IOException {
     Assert.assertNotNull(omBucketInfo);
-    return omMetadataManager.getOpenFileName(
+    final long volumeId = omMetadataManager.getVolumeId(volumeName);
+    final long bucketId = omMetadataManager.getBucketId(volumeName,
+            bucketName);
+    return omMetadataManager.getOpenFileName(volumeId, bucketId,
             omBucketInfo.getObjectID(), keyName, clientID);
   }
 
diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/key/TestOMKeyDeleteResponse.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/key/TestOMKeyDeleteResponse.java
index fa741777e2..585dba6694 100644
--- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/key/TestOMKeyDeleteResponse.java
+++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/key/TestOMKeyDeleteResponse.java
@@ -22,6 +22,7 @@ import org.apache.hadoop.hdds.client.BlockID;
 import org.apache.hadoop.hdds.client.RatisReplicationConfig;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
+import org.apache.hadoop.ozone.om.helpers.BucketLayout;
 import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
 import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
 import org.apache.hadoop.ozone.om.request.OMRequestTestUtils;
@@ -46,9 +47,12 @@ public class TestOMKeyDeleteResponse extends TestOMKeyResponse {
   public void testAddToDBBatch() throws Exception {
     omBucketInfo = OmBucketInfo.newBuilder()
             .setVolumeName(volumeName).setBucketName(bucketName)
+            .setObjectID(System.currentTimeMillis())
             .setCreationTime(Time.now()).build();
 
-    OmKeyInfo omKeyInfo = getOmKeyInfo();
+    String ozoneKey = addKeyToTable();
+    OmKeyInfo omKeyInfo = omMetadataManager
+            .getKeyTable(getBucketLayout()).get(ozoneKey);
 
     OzoneManagerProtocolProtos.OMResponse omResponse =
         OzoneManagerProtocolProtos.OMResponse.newBuilder().setDeleteKeyResponse(
@@ -60,8 +64,6 @@ public class TestOMKeyDeleteResponse extends TestOMKeyResponse {
     OMKeyDeleteResponse omKeyDeleteResponse = getOmKeyDeleteResponse(omKeyInfo,
             omResponse);
 
-    String ozoneKey = addKeyToTable();
-
     Assert.assertTrue(
         omMetadataManager.getKeyTable(getBucketLayout()).isExist(ozoneKey));
     omKeyDeleteResponse.addToDBBatch(omMetadataManager, batchOperation);
@@ -85,9 +87,13 @@ public class TestOMKeyDeleteResponse extends TestOMKeyResponse {
   public void testAddToDBBatchWithNonEmptyBlocks() throws Exception {
     omBucketInfo = OmBucketInfo.newBuilder()
             .setVolumeName(volumeName).setBucketName(bucketName)
+            .setObjectID(System.currentTimeMillis())
             .setCreationTime(Time.now()).build();
 
-    OmKeyInfo omKeyInfo = getOmKeyInfo();
+    final String ozoneKey = addKeyToTable();
+    final OmKeyInfo omKeyInfo = omMetadataManager
+            .getKeyTable(getBucketLayout())
+            .get(ozoneKey);
 
     // Add block to key.
     List<OmKeyLocationInfo> omKeyLocationInfoList = new ArrayList<>();
@@ -110,8 +116,6 @@ public class TestOMKeyDeleteResponse extends TestOMKeyResponse {
 
     omKeyInfo.appendNewBlocks(omKeyLocationInfoList, false);
 
-    String ozoneKey = addKeyToTable();
-
     OzoneManagerProtocolProtos.OMResponse omResponse =
         OzoneManagerProtocolProtos.OMResponse.newBuilder().setDeleteKeyResponse(
             OzoneManagerProtocolProtos.DeleteKeyResponse.getDefaultInstance())
@@ -191,4 +195,8 @@ public class TestOMKeyDeleteResponse extends TestOMKeyResponse {
   protected OmBucketInfo getOmBucketInfo() {
     return omBucketInfo;
   }
+
+  public BucketLayout getBucketLayout() {
+    return BucketLayout.OBJECT_STORE;
+  }
 }
diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/key/TestOMKeyDeleteResponseWithFSO.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/key/TestOMKeyDeleteResponseWithFSO.java
index 4455565914..ca37e55c48 100644
--- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/key/TestOMKeyDeleteResponseWithFSO.java
+++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/key/TestOMKeyDeleteResponseWithFSO.java
@@ -56,7 +56,10 @@ public class TestOMKeyDeleteResponseWithFSO extends TestOMKeyDeleteResponse {
                     parentId, 100, Time.now());
     OMRequestTestUtils.addFileToKeyTable(false, false,
             keyName, omKeyInfo, -1, 50, omMetadataManager);
-    return omKeyInfo.getPath();
+    return omMetadataManager.getOzonePathKey(
+            omMetadataManager.getVolumeId(volumeName),
+            omMetadataManager.getBucketId(volumeName, bucketName),
+           omKeyInfo.getParentObjectID(), keyName);
   }
 
   @Override
diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/key/TestOMKeyResponse.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/key/TestOMKeyResponse.java
index b41c8680b3..0a2147beb7 100644
--- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/key/TestOMKeyResponse.java
+++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/key/TestOMKeyResponse.java
@@ -18,12 +18,18 @@
 
 package org.apache.hadoop.ozone.om.response.key;
 
+import java.io.IOException;
 import java.util.Random;
 import java.util.UUID;
 
+import com.google.common.base.Optional;
+import org.apache.hadoop.hdds.protocol.StorageType;
+import org.apache.hadoop.hdds.utils.db.cache.CacheKey;
+import org.apache.hadoop.hdds.utils.db.cache.CacheValue;
 import org.apache.hadoop.ozone.om.helpers.BucketLayout;
 import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
 import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
+import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
 import org.apache.hadoop.ozone.om.helpers.RepeatedOmKeyInfo;
 import org.apache.hadoop.ozone.om.request.OMRequestTestUtils;
 import org.jetbrains.annotations.NotNull;
@@ -78,10 +84,34 @@ public class TestOMKeyResponse {
     clientID = 1000L;
     random = new Random();
     keysToDelete = null;
+
+    final OmVolumeArgs volumeArgs = OmVolumeArgs.newBuilder()
+            .setVolume(volumeName)
+            .setAdminName("admin")
+            .setOwnerName("owner")
+            .setObjectID(System.currentTimeMillis())
+            .build();
+
+    omMetadataManager.getVolumeTable().addCacheEntry(
+            new CacheKey<>(omMetadataManager.getVolumeKey(volumeName)),
+            new CacheValue<>(Optional.of(volumeArgs), 1));
+
+    omBucketInfo = OmBucketInfo.newBuilder()
+            .setVolumeName(volumeName)
+            .setBucketName(bucketName)
+            .setObjectID(System.currentTimeMillis())
+            .setStorageType(StorageType.DISK)
+            .setIsVersionEnabled(false)
+            .build();
+
+    omMetadataManager.getBucketTable().addCacheEntry(
+            new CacheKey<>(omMetadataManager.getBucketKey(
+                    volumeName, bucketName)),
+            new CacheValue<>(Optional.of(omBucketInfo), 1));
   }
 
   @NotNull
-  protected String getOpenKeyName() {
+  protected String getOpenKeyName()  throws IOException {
     return omMetadataManager.getOpenKey(volumeName, bucketName, keyName,
             clientID);
   }
diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/key/TestOMOpenKeysDeleteResponse.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/key/TestOMOpenKeysDeleteResponse.java
index 876c3bd76a..510ac2afc6 100644
--- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/key/TestOMOpenKeysDeleteResponse.java
+++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/key/TestOMOpenKeysDeleteResponse.java
@@ -38,6 +38,8 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.UUID;
 
+import static org.apache.hadoop.ozone.om.request.OMRequestTestUtils.addBucketToDB;
+
 /**
  * Tests OMOpenKeysDeleteResponse.
  */
@@ -69,6 +71,8 @@ public class TestOMOpenKeysDeleteResponse extends TestOMKeyResponse {
    */
   @Test
   public void testAddToDBBatchWithEmptyBlocks() throws Exception {
+    OMRequestTestUtils.addVolumeAndBucketToDB(volumeName, bucketName,
+            omMetadataManager, getBucketLayout());
     Map<String, OmKeyInfo> keysToDelete = addOpenKeysToDB(volumeName, 3);
     Map<String, OmKeyInfo> keysToKeep = addOpenKeysToDB(volumeName, 3);
 
@@ -96,6 +100,8 @@ public class TestOMOpenKeysDeleteResponse extends TestOMKeyResponse {
    */
   @Test
   public void testAddToDBBatchWithNonEmptyBlocks() throws Exception {
+    OMRequestTestUtils.addVolumeAndBucketToDB(volumeName, bucketName,
+            omMetadataManager, getBucketLayout());
     Map<String, OmKeyInfo> keysToDelete = addOpenKeysToDB(volumeName, 3,
         KEY_LENGTH);
     Map<String, OmKeyInfo> keysToKeep = addOpenKeysToDB(volumeName, 3,
@@ -126,6 +132,8 @@ public class TestOMOpenKeysDeleteResponse extends TestOMKeyResponse {
    */
   @Test
   public void testAddToDBBatchWithErrorResponse() throws Exception {
+    OMRequestTestUtils.addVolumeAndBucketToDB(volumeName, bucketName,
+            omMetadataManager, getBucketLayout());
     Map<String, OmKeyInfo> keysToDelete = addOpenKeysToDB(volumeName, 3);
 
     createAndCommitResponse(keysToDelete, Status.INTERNAL_ERROR);
@@ -190,6 +198,7 @@ public class TestOMOpenKeysDeleteResponse extends TestOMKeyResponse {
     for (int i = 0; i < numKeys; i++) {
       String bucket = UUID.randomUUID().toString();
       String key = UUID.randomUUID().toString();
+      addBucketToDB(volume, bucket, omMetadataManager, getBucketLayout());
       long clientID = random.nextLong();
       long parentID = random.nextLong();
 
@@ -207,11 +216,14 @@ public class TestOMOpenKeysDeleteResponse extends TestOMKeyResponse {
       // cache by the request, and it would only remain in the DB.
       if (getBucketLayout().isFileSystemOptimized()) {
         String file = OzoneFSUtils.getFileName(key);
+        final long volumeId = omMetadataManager.getVolumeId(volume);
+        final long bucketId = omMetadataManager.getBucketId(volume, bucket);
         omKeyInfo.setFileName(file);
         omKeyInfo.setParentObjectID(parentID);
         OMRequestTestUtils.addFileToKeyTable(true, false, file, omKeyInfo,
             clientID, 0L, omMetadataManager);
-        openKey = omMetadataManager.getOpenFileName(parentID, file, clientID);
+        openKey = omMetadataManager.getOpenFileName(
+                volumeId, bucketId, parentID, file, clientID);
       } else {
         OMRequestTestUtils.addKeyToTable(true, false, omKeyInfo,
             clientID, 0L, omMetadataManager);
diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/s3/multipart/TestS3InitiateMultipartUploadResponseWithFSO.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/s3/multipart/TestS3InitiateMultipartUploadResponseWithFSO.java
index e02b8e43ac..8c90751641 100644
--- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/s3/multipart/TestS3InitiateMultipartUploadResponseWithFSO.java
+++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/s3/multipart/TestS3InitiateMultipartUploadResponseWithFSO.java
@@ -50,6 +50,9 @@ public class TestS3InitiateMultipartUploadResponseWithFSO
 
     String multipartUploadID = UUID.randomUUID().toString();
 
+    addVolumeToDB(volumeName);
+    addBucketToDB(volumeName, bucketName);
+
     long parentID = 1027; // assume objectID of dir path "a/b/c/d" is 1027
     List<OmDirectoryInfo> parentDirInfos = new ArrayList<>();
 
@@ -66,8 +69,13 @@ public class TestS3InitiateMultipartUploadResponseWithFSO
     String multipartKey = omMetadataManager
         .getMultipartKey(volumeName, bucketName, keyName, multipartUploadID);
 
+    final long volumeId = omMetadataManager.getVolumeId(volumeName);
+    final long bucketId = omMetadataManager.getBucketId(volumeName,
+            bucketName);
+
     String multipartOpenKey = omMetadataManager
-        .getMultipartKey(parentID, fileName, multipartUploadID);
+        .getMultipartKey(volumeId, bucketId, parentID,
+                fileName, multipartUploadID);
 
     OmKeyInfo omKeyInfo = omMetadataManager.getOpenKeyTable(getBucketLayout())
         .get(multipartOpenKey);
diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/s3/multipart/TestS3MultipartResponse.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/s3/multipart/TestS3MultipartResponse.java
index c963cb6c96..9adb39b373 100644
--- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/s3/multipart/TestS3MultipartResponse.java
+++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/s3/multipart/TestS3MultipartResponse.java
@@ -25,11 +25,16 @@ import java.util.Collections;
 import java.util.List;
 import java.util.UUID;
 
+import com.google.common.base.Optional;
 import org.apache.hadoop.hdds.client.RatisReplicationConfig;
+import org.apache.hadoop.hdds.protocol.StorageType;
+import org.apache.hadoop.hdds.utils.db.cache.CacheKey;
+import org.apache.hadoop.hdds.utils.db.cache.CacheValue;
 import org.apache.hadoop.ozone.om.helpers.OmDirectoryInfo;
 import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
 import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfoGroup;
 import org.apache.hadoop.ozone.om.helpers.OmMultipartKeyInfo;
+import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
 import org.apache.hadoop.ozone.om.helpers.OzoneFSUtils;
 import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
 import org.apache.hadoop.ozone.om.helpers.BucketLayout;
@@ -142,8 +147,9 @@ public class TestS3MultipartResponse {
     omMultipartKeyInfo.addPartKeyInfo(partNumber, partKeyInfo);
   }
 
-  public PartKeyInfo createPartKeyInfo(
-      String volumeName, String bucketName, String keyName, int partNumber) {
+  public PartKeyInfo createPartKeyInfo(String volumeName, String bucketName,
+                                       String keyName, int partNumber)
+          throws IOException {
     return PartKeyInfo.newBuilder()
         .setPartNumber(partNumber)
         .setPartName(omMetadataManager.getMultipartKey(volumeName,
@@ -161,10 +167,14 @@ public class TestS3MultipartResponse {
 
   public PartKeyInfo createPartKeyInfoFSO(
       String volumeName, String bucketName, long parentID, String fileName,
-      int partNumber) {
+      int partNumber) throws IOException {
+    final long volumeId = omMetadataManager.getVolumeId(volumeName);
+    final long bucketId = omMetadataManager.getBucketId(volumeName,
+            bucketName);
     return PartKeyInfo.newBuilder()
         .setPartNumber(partNumber)
-        .setPartName(omMetadataManager.getOzonePathKey(parentID, fileName +
+        .setPartName(omMetadataManager.getOzonePathKey(volumeId, bucketId,
+                parentID, fileName +
                 UUID.randomUUID().toString()))
         .setPartKeyInfo(KeyInfo.newBuilder()
             .setVolumeName(volumeName)
@@ -284,14 +294,18 @@ public class TestS3MultipartResponse {
           OzoneManagerProtocolProtos.Status status,
           List<OmKeyInfo> unUsedParts,
           OmBucketInfo omBucketInfo,
-          RepeatedOmKeyInfo keysToDelete) {
+          RepeatedOmKeyInfo keysToDelete) throws IOException {
 
 
     String multipartKey = omMetadataManager
         .getMultipartKey(volumeName, bucketName, keyName, multipartUploadID);
 
-    String multipartOpenKey = getMultipartKey(parentID, keyName,
-        multipartUploadID);
+    final long volumeId = omMetadataManager.getVolumeId(volumeName);
+    final long bucketId = omMetadataManager.getBucketId(volumeName,
+            bucketName);
+    String fileName = OzoneFSUtils.getFileName(keyName);
+    String multipartOpenKey = omMetadataManager.getMultipartKey(
+            volumeId, bucketId, parentID, fileName, multipartUploadID);
 
     OMResponse omResponse = OMResponse.newBuilder()
             .setCmdType(OzoneManagerProtocolProtos.Type.CompleteMultiPartUpload)
@@ -306,13 +320,6 @@ public class TestS3MultipartResponse {
         getBucketLayout(), omBucketInfo, keysToDelete);
   }
 
-  private String getMultipartKey(long parentID, String keyName,
-                                 String multipartUploadID) {
-    String fileName = OzoneFSUtils.getFileName(keyName);
-    return omMetadataManager.getMultipartKey(parentID, fileName,
-            multipartUploadID);
-  }
-
   protected S3InitiateMultipartUploadResponse getS3InitiateMultipartUploadResp(
       OmMultipartKeyInfo multipartKeyInfo, OmKeyInfo omKeyInfo,
       OMResponse omResponse) {
@@ -332,4 +339,33 @@ public class TestS3MultipartResponse {
   public BucketLayout getBucketLayout() {
     return BucketLayout.DEFAULT;
   }
+
+  public void addVolumeToDB(String volumeName) throws IOException {
+    final OmVolumeArgs volumeArgs = OmVolumeArgs.newBuilder()
+            .setVolume(volumeName)
+            .setAdminName("admin")
+            .setOwnerName("owner")
+            .setObjectID(System.currentTimeMillis())
+            .build();
+
+    omMetadataManager.getVolumeTable().addCacheEntry(
+            new CacheKey<>(omMetadataManager.getVolumeKey(volumeName)),
+            new CacheValue<>(Optional.of(volumeArgs), 1));
+  }
+
+  public void addBucketToDB(String volumeName, String bucketName)
+          throws IOException {
+    final OmBucketInfo omBucketInfo = OmBucketInfo.newBuilder()
+            .setVolumeName(volumeName)
+            .setBucketName(bucketName)
+            .setObjectID(System.currentTimeMillis())
+            .setStorageType(StorageType.DISK)
+            .setIsVersionEnabled(false)
+            .build();
+
+    omMetadataManager.getBucketTable().addCacheEntry(
+            new CacheKey<>(omMetadataManager.getBucketKey(
+                    volumeName, bucketName)),
+            new CacheValue<>(Optional.of(omBucketInfo), 1));
+  }
 }
diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/s3/multipart/TestS3MultipartUploadAbortResponse.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/s3/multipart/TestS3MultipartUploadAbortResponse.java
index b7046d87d4..8a41dbe023 100644
--- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/s3/multipart/TestS3MultipartUploadAbortResponse.java
+++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/s3/multipart/TestS3MultipartUploadAbortResponse.java
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.ozone.om.response.s3.multipart;
 
+import java.io.IOException;
 import java.util.UUID;
 
 import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
@@ -45,6 +46,9 @@ public class TestS3MultipartUploadAbortResponse
     String bucketName = UUID.randomUUID().toString();
     String keyName = getKeyName();
     String multipartUploadID = UUID.randomUUID().toString();
+
+    addVolumeToDB(volumeName);
+    addBucketToDB(volumeName, bucketName);
     String multipartOpenKey = getMultipartOpenKey(volumeName, bucketName,
         keyName, multipartUploadID);
 
@@ -107,6 +111,8 @@ public class TestS3MultipartUploadAbortResponse
     String bucketName = UUID.randomUUID().toString();
     String keyName = getKeyName();
     String multipartUploadID = UUID.randomUUID().toString();
+    addVolumeToDB(volumeName);
+    addBucketToDB(volumeName, bucketName);
     String multipartOpenKey = getMultipartOpenKey(volumeName, bucketName,
         keyName, multipartUploadID);
     String multipartKey = omMetadataManager.getMultipartKey(volumeName,
@@ -185,7 +191,7 @@ public class TestS3MultipartUploadAbortResponse
   }
 
   protected String getMultipartOpenKey(String volumeName, String bucketName,
-      String keyName, String multipartUploadID) {
+      String keyName, String multipartUploadID) throws IOException {
     return omMetadataManager.getMultipartKey(volumeName,
         bucketName, keyName, multipartUploadID);
   }
diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/s3/multipart/TestS3MultipartUploadAbortResponseWithFSO.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/s3/multipart/TestS3MultipartUploadAbortResponseWithFSO.java
index 019b2f1d73..e5d1536667 100644
--- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/s3/multipart/TestS3MultipartUploadAbortResponseWithFSO.java
+++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/s3/multipart/TestS3MultipartUploadAbortResponseWithFSO.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.ozone.om.helpers.BucketLayout;
 import org.apache.hadoop.ozone.om.helpers.OzoneFSUtils;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.UUID;
 
@@ -51,10 +52,13 @@ public class TestS3MultipartUploadAbortResponseWithFSO
 
   @Override
   protected String getMultipartOpenKey(String volumeName, String bucketName,
-      String keyName, String multipartUploadID) {
+      String keyName, String multipartUploadID) throws IOException {
+    final long volumeId = omMetadataManager.getVolumeId(volumeName);
+    final long bucketId = omMetadataManager.getBucketId(volumeName,
+            bucketName);
     String fileName = StringUtils.substringAfter(keyName, dirName);
-    return omMetadataManager.getMultipartKey(parentID, fileName,
-        multipartUploadID);
+    return omMetadataManager.getMultipartKey(volumeId, bucketId,
+            parentID, fileName, multipartUploadID);
   }
 
   @Override
@@ -94,7 +98,8 @@ public class TestS3MultipartUploadAbortResponseWithFSO
 
   @Override
   public OzoneManagerProtocolProtos.PartKeyInfo createPartKeyInfo(
-      String volumeName, String bucketName, String keyName, int partNumber) {
+      String volumeName, String bucketName, String keyName, int partNumber)
+          throws IOException {
 
     String fileName = OzoneFSUtils.getFileName(keyName);
     return createPartKeyInfoFSO(volumeName, bucketName, parentID, fileName,
diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/s3/multipart/TestS3MultipartUploadCommitPartResponseWithFSO.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/s3/multipart/TestS3MultipartUploadCommitPartResponseWithFSO.java
index f2089457de..abd2df77e1 100644
--- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/s3/multipart/TestS3MultipartUploadCommitPartResponseWithFSO.java
+++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/s3/multipart/TestS3MultipartUploadCommitPartResponseWithFSO.java
@@ -57,9 +57,12 @@ public class TestS3MultipartUploadCommitPartResponseWithFSO
     String fileName = OzoneFSUtils.getFileName(keyName);
     String multipartKey = omMetadataManager
         .getMultipartKey(volumeName, bucketName, keyName, multipartUploadID);
+    final long volumeId = omMetadataManager.getVolumeId(volumeName);
+    final long bucketId = omMetadataManager.getBucketId(volumeName,
+            bucketName);
     long clientId = Time.now();
-    String openKey = omMetadataManager.getOpenFileName(parentID, fileName,
-            clientId);
+    String openKey = omMetadataManager.getOpenFileName(volumeId, bucketId,
+            parentID, fileName, clientId);
 
     S3MultipartUploadCommitPartResponse s3MultipartUploadCommitPartResponse =
         createS3CommitMPUResponseFSO(volumeName, bucketName, parentID, keyName,
@@ -117,8 +120,11 @@ public class TestS3MultipartUploadCommitPartResponseWithFSO
     addPart(1, part1, omMultipartKeyInfo);
 
     long clientId = Time.now();
-    String openKey = omMetadataManager.getOpenFileName(parentID, fileName,
-            clientId);
+    final long volumeId = omMetadataManager.getVolumeId(volumeName);
+    final long bucketId = omMetadataManager.getBucketId(volumeName,
+            bucketName);
+    String openKey = omMetadataManager.getOpenFileName(volumeId, bucketId,
+            parentID, fileName, clientId);
 
     S3MultipartUploadCommitPartResponse s3MultipartUploadCommitPartResponse =
             createS3CommitMPUResponseFSO(volumeName, bucketName, parentID,
@@ -166,9 +172,13 @@ public class TestS3MultipartUploadCommitPartResponseWithFSO
 
     String multipartUploadID = UUID.randomUUID().toString();
 
+    final long volumeId = omMetadataManager.getVolumeId(volumeName);
+    final long bucketId = omMetadataManager.getBucketId(volumeName,
+            bucketName);
+
     String fileName = OzoneFSUtils.getFileName(keyName);
-    String multipartKey = omMetadataManager.getMultipartKey(parentID, fileName,
-            multipartUploadID);
+    String multipartKey = omMetadataManager.getMultipartKey(volumeId, bucketId,
+            parentID, fileName, multipartUploadID);
 
     S3InitiateMultipartUploadResponse s3InitiateMultipartUploadResponseFSO =
             createS3InitiateMPUResponseFSO(volumeName, bucketName, parentID,
@@ -189,8 +199,8 @@ public class TestS3MultipartUploadCommitPartResponseWithFSO
     addPart(1, part1, omMultipartKeyInfo);
 
     long clientId = Time.now();
-    String openKey = omMetadataManager.getOpenFileName(parentID, fileName,
-            clientId);
+    String openKey = omMetadataManager.getOpenFileName(volumeId, bucketId,
+            parentID, fileName, clientId);
 
     S3MultipartUploadCommitPartResponse s3MultipartUploadCommitPartResponse =
             createS3CommitMPUResponseFSO(volumeName, bucketName, parentID,
diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/s3/multipart/TestS3MultipartUploadCompleteResponseWithFSO.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/s3/multipart/TestS3MultipartUploadCompleteResponseWithFSO.java
index 140bfd6119..ec0b025e10 100644
--- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/s3/multipart/TestS3MultipartUploadCompleteResponseWithFSO.java
+++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/s3/multipart/TestS3MultipartUploadCompleteResponseWithFSO.java
@@ -61,10 +61,13 @@ public class TestS3MultipartUploadCompleteResponseWithFSO
     long txnId = 50;
     long objectId = parentID + 1;
     String fileName = OzoneFSUtils.getFileName(keyName);
+    final long volumeId = omMetadataManager.getVolumeId(volumeName);
+    final long bucketId = omMetadataManager.getBucketId(volumeName,
+            bucketName);
     String dbMultipartKey = omMetadataManager.getMultipartKey(volumeName,
             bucketName, keyName, multipartUploadID);
-    String dbMultipartOpenKey = omMetadataManager.getMultipartKey(parentID,
-            fileName, multipartUploadID);
+    String dbMultipartOpenKey = omMetadataManager.getMultipartKey(volumeId,
+            bucketId, parentID, fileName, multipartUploadID);
     long clientId = Time.now();
 
     // add MPU entry to OpenFileTable
@@ -78,9 +81,10 @@ public class TestS3MultipartUploadCompleteResponseWithFSO
 
     omMetadataManager.getStore().commitBatchOperation(batchOperation);
 
-    String dbOpenKey = omMetadataManager.getOpenFileName(parentID, fileName,
-            clientId);
-    String dbKey = omMetadataManager.getOzonePathKey(parentID, fileName);
+    String dbOpenKey = omMetadataManager.getOpenFileName(volumeId, bucketId,
+            parentID, fileName, clientId);
+    String dbKey = omMetadataManager.getOzonePathKey(volumeId, bucketId,
+            parentID, fileName);
     OmKeyInfo omKeyInfoFSO =
             OMRequestTestUtils.createOmKeyInfo(volumeName, bucketName, keyName,
                     HddsProtos.ReplicationType.RATIS,
@@ -190,12 +194,15 @@ public class TestS3MultipartUploadCompleteResponseWithFSO
           throws Exception {
 
     String multipartUploadID = UUID.randomUUID().toString();
+    final long volumeId = omMetadataManager.getVolumeId(volumeName);
+    final long bucketId = omMetadataManager.getBucketId(volumeName,
+            bucketName);
 
     String fileName = OzoneFSUtils.getFileName(keyName);
     String dbMultipartKey = omMetadataManager.getMultipartKey(volumeName,
         bucketName, keyName, multipartUploadID);
-    String dbMultipartOpenKey = omMetadataManager.getMultipartKey(parentID,
-        fileName, multipartUploadID);
+    String dbMultipartOpenKey = omMetadataManager.getMultipartKey(volumeId,
+            bucketId, parentID, fileName, multipartUploadID);
 
     S3InitiateMultipartUploadResponse s3InitiateMultipartUploadResponseFSO =
             addS3InitiateMultipartUpload(volumeName, bucketName, keyName,
@@ -237,8 +244,8 @@ public class TestS3MultipartUploadCompleteResponseWithFSO
             batchOperation);
 
     omMetadataManager.getStore().commitBatchOperation(batchOperation);
-    String dbKey = omMetadataManager.getOzonePathKey(parentID,
-          omKeyInfoFSO.getFileName());
+    String dbKey = omMetadataManager.getOzonePathKey(volumeId, bucketId,
+            parentID, omKeyInfoFSO.getFileName());
     Assert.assertNotNull(
         omMetadataManager.getKeyTable(getBucketLayout()).get(dbKey));
     Assert.assertNull(
@@ -256,14 +263,18 @@ public class TestS3MultipartUploadCompleteResponseWithFSO
       OmMultipartKeyInfo omMultipartKeyInfo,
       int deleteEntryCount) throws IOException {
 
+    final long volumeId = omMetadataManager.getVolumeId(volumeName);
+    final long bucketId = omMetadataManager.getBucketId(volumeName,
+            bucketName);
+
     PartKeyInfo part1 = createPartKeyInfoFSO(volumeName, bucketName, parentID,
         fileName, 1);
 
     addPart(1, part1, omMultipartKeyInfo);
 
     long clientId = Time.now();
-    String openKey = omMetadataManager.getOpenFileName(parentID, fileName,
-            clientId);
+    String openKey = omMetadataManager.getOpenFileName(volumeId, bucketId,
+            parentID, fileName, clientId);
 
     S3MultipartUploadCommitPartResponse s3MultipartUploadCommitPartResponse =
             createS3CommitMPUResponseFSO(volumeName, bucketName, parentID,
diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/NSSummaryEndpoint.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/NSSummaryEndpoint.java
index ac34c58f97..f901135000 100644
--- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/NSSummaryEndpoint.java
+++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/NSSummaryEndpoint.java
@@ -394,10 +394,13 @@ public class NSSummaryEndpoint {
       // DU for key doesn't have subpaths
       duResponse.setCount(0);
       // The object ID for the directory that the key is directly in
+      final long volumeId = getVolumeObjectId(names);
+      final long bucketId = getBucketObjectId(names);
       long parentObjectId = getDirObjectId(names, names.length - 1);
       String fileName = names[names.length - 1];
       String ozoneKey =
-          omMetadataManager.getOzonePathKey(parentObjectId, fileName);
+          omMetadataManager.getOzonePathKey(volumeId, bucketId,
+                  parentObjectId, fileName);
       OmKeyInfo keyInfo =
           omMetadataManager.getFileTable().getSkipCache(ozoneKey);
       duResponse.setSize(keyInfo.getDataSize());
@@ -615,11 +618,25 @@ public class NSSummaryEndpoint {
           || !bucketExists(volName, bucketName)) {
         return EntityType.UNKNOWN;
       }
+      final long volumeId = getVolumeObjectId(names);
       long bucketObjectId = getBucketObjectId(names);
-      return determineKeyPath(keyName, bucketObjectId);
+      return determineKeyPath(keyName, volumeId, bucketObjectId);
     }
   }
 
+  /**
+   * Given a existent path, get the volume object ID.
+   * @param names valid path request
+   * @return volume objectID
+   * @throws IOException
+   */
+  private long getVolumeObjectId(String[] names) throws IOException {
+    String bucketKey = omMetadataManager.getVolumeKey(names[0]);
+    OmVolumeArgs volumeInfo = omMetadataManager
+            .getVolumeTable().getSkipCache(bucketKey);
+    return volumeInfo.getObjectID();
+  }
+
   /**
    * Given a existent path, get the bucket object ID.
    * @param names valid path request
@@ -656,7 +673,8 @@ public class NSSummaryEndpoint {
     long dirObjectId = getBucketObjectId(names);
     String dirKey = null;
     for (int i = 2; i < cutoff; ++i) {
-      dirKey = omMetadataManager.getOzonePathKey(dirObjectId, names[i]);
+      dirKey = omMetadataManager.getOzonePathKey(getVolumeObjectId(names),
+              getBucketObjectId(names), dirObjectId, names[i]);
       OmDirectoryInfo dirInfo =
           omMetadataManager.getDirectoryTable().getSkipCache(dirKey);
       dirObjectId = dirInfo.getObjectID();
@@ -996,7 +1014,8 @@ public class NSSummaryEndpoint {
    * @return DIRECTORY, KEY, or UNKNOWN
    * @throws IOException
    */
-  private EntityType determineKeyPath(String keyName, long bucketObjectId)
+  private EntityType determineKeyPath(String keyName, long volumeId,
+                                      long bucketObjectId)
       throws IOException {
 
     java.nio.file.Path keyPath = Paths.get(keyName);
@@ -1012,8 +1031,8 @@ public class NSSummaryEndpoint {
       // 'buck1' to the leaf node component, which is 'file1.txt'.
       // 2. If there is no dir exists for the leaf node component 'file1.txt'
       // then do look it on fileTable.
-      String dbNodeName = omMetadataManager.getOzonePathKey(
-          lastKnownParentId, fileName);
+      String dbNodeName = omMetadataManager.getOzonePathKey(volumeId,
+              bucketObjectId, lastKnownParentId, fileName);
       omDirInfo = omMetadataManager.getDirectoryTable()
           .getSkipCache(dbNodeName);
 
diff --git a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/OMMetadataManagerTestUtils.java b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/OMMetadataManagerTestUtils.java
index 7b5e88d3df..03a95470d5 100644
--- a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/OMMetadataManagerTestUtils.java
+++ b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/OMMetadataManagerTestUtils.java
@@ -195,10 +195,13 @@ public final class OMMetadataManagerTestUtils {
                                     String fileName,
                                     long objectID,
                                     long parentObjectId,
+                                    long bucketObjectId,
+                                    long volumeObjectId,
                                     long dataSize)
           throws IOException {
     // DB key in FileTable => "parentId/filename"
-    String omKey = omMetadataManager.getOzonePathKey(parentObjectId, fileName);
+    String omKey = omMetadataManager.getOzonePathKey(volumeObjectId,
+            bucketObjectId, parentObjectId, fileName);
     omMetadataManager.getKeyTable(BucketLayout.FILE_SYSTEM_OPTIMIZED).put(omKey,
             new OmKeyInfo.Builder()
                     .setBucketName(bucket)
@@ -214,6 +217,8 @@ public final class OMMetadataManagerTestUtils {
 
   @SuppressWarnings("checkstyle:parameternumber")
   public static void writeKeyToOm(OMMetadataManager omMetadataManager,
+                                  long volumeObjectId,
+                                  long bucketObjectId,
                                   long parentObjectId,
                                   long objectId,
                                   String volName,
@@ -222,7 +227,8 @@ public final class OMMetadataManagerTestUtils {
                                   String fileName,
                                   List<OmKeyLocationInfoGroup> locationVersions)
           throws IOException {
-    String omKey = omMetadataManager.getOzonePathKey(parentObjectId, fileName);
+    String omKey = omMetadataManager.getOzonePathKey(volumeObjectId,
+            bucketObjectId, parentObjectId, fileName);
     omMetadataManager.getKeyTable(BucketLayout.FILE_SYSTEM_OPTIMIZED).put(omKey,
             new OmKeyInfo.Builder()
                     .setBucketName(bucketName)
@@ -239,9 +245,12 @@ public final class OMMetadataManagerTestUtils {
   public static void writeDirToOm(OMMetadataManager omMetadataManager,
                                   long objectId,
                                   long parentObjectId,
+                                  long bucketObjectId,
+                                  long volumeObjectId,
                                   String dirName) throws IOException {
     // DB key in DirectoryTable => "parentId/dirName"
-    String omKey = omMetadataManager.getOzonePathKey(parentObjectId, dirName);
+    String omKey = omMetadataManager.getOzonePathKey(volumeObjectId,
+            bucketObjectId, parentObjectId, dirName);
     omMetadataManager.getDirectoryTable().put(omKey,
             new OmDirectoryInfo.Builder()
                     .setName(dirName)
diff --git a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestNSSummaryEndpoint.java b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestNSSummaryEndpoint.java
index 1bb419feee..4cd9767a15 100644
--- a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestNSSummaryEndpoint.java
+++ b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestNSSummaryEndpoint.java
@@ -445,13 +445,17 @@ public class TestNSSummaryEndpoint {
   private void populateOMDB() throws Exception {
     // write all 4 directories
     writeDirToOm(reconOMMetadataManager, DIR_ONE_OBJECT_ID,
-            BUCKET_ONE_OBJECT_ID, DIR_ONE);
+            BUCKET_ONE_OBJECT_ID, BUCKET_ONE_OBJECT_ID,
+            VOL_OBJECT_ID, DIR_ONE);
     writeDirToOm(reconOMMetadataManager, DIR_TWO_OBJECT_ID,
-            DIR_ONE_OBJECT_ID, DIR_TWO);
+            DIR_ONE_OBJECT_ID, BUCKET_ONE_OBJECT_ID,
+            VOL_OBJECT_ID, DIR_TWO);
     writeDirToOm(reconOMMetadataManager, DIR_THREE_OBJECT_ID,
-            DIR_ONE_OBJECT_ID, DIR_THREE);
+            DIR_ONE_OBJECT_ID, BUCKET_ONE_OBJECT_ID,
+            VOL_OBJECT_ID, DIR_THREE);
     writeDirToOm(reconOMMetadataManager, DIR_FOUR_OBJECT_ID,
-            DIR_ONE_OBJECT_ID, DIR_FOUR);
+            DIR_ONE_OBJECT_ID, BUCKET_ONE_OBJECT_ID,
+            VOL_OBJECT_ID, DIR_FOUR);
 
     // write all 6 keys
     writeKeyToOm(reconOMMetadataManager,
@@ -461,6 +465,8 @@ public class TestNSSummaryEndpoint {
             FILE_ONE,
             KEY_ONE_OBJECT_ID,
             BUCKET_ONE_OBJECT_ID,
+            BUCKET_ONE_OBJECT_ID,
+            VOL_OBJECT_ID,
             KEY_ONE_SIZE);
     writeKeyToOm(reconOMMetadataManager,
             KEY_TWO,
@@ -469,6 +475,8 @@ public class TestNSSummaryEndpoint {
             FILE_TWO,
             KEY_TWO_OBJECT_ID,
             DIR_TWO_OBJECT_ID,
+            BUCKET_ONE_OBJECT_ID,
+            VOL_OBJECT_ID,
             KEY_TWO_SIZE);
     writeKeyToOm(reconOMMetadataManager,
             KEY_THREE,
@@ -477,6 +485,8 @@ public class TestNSSummaryEndpoint {
             FILE_THREE,
             KEY_THREE_OBJECT_ID,
             DIR_THREE_OBJECT_ID,
+            BUCKET_ONE_OBJECT_ID,
+            VOL_OBJECT_ID,
             KEY_THREE_SIZE);
     writeKeyToOm(reconOMMetadataManager,
             KEY_FOUR,
@@ -485,6 +495,8 @@ public class TestNSSummaryEndpoint {
             FILE_FOUR,
             KEY_FOUR_OBJECT_ID,
             BUCKET_TWO_OBJECT_ID,
+            BUCKET_TWO_OBJECT_ID,
+            VOL_OBJECT_ID,
             KEY_FOUR_SIZE);
     writeKeyToOm(reconOMMetadataManager,
             KEY_FIVE,
@@ -493,6 +505,8 @@ public class TestNSSummaryEndpoint {
             FILE_FIVE,
             KEY_FIVE_OBJECT_ID,
             BUCKET_TWO_OBJECT_ID,
+            BUCKET_TWO_OBJECT_ID,
+            VOL_OBJECT_ID,
             KEY_FIVE_SIZE);
     writeKeyToOm(reconOMMetadataManager,
             KEY_SIX,
@@ -501,6 +515,8 @@ public class TestNSSummaryEndpoint {
             FILE_SIX,
             KEY_SIX_OBJECT_ID,
             DIR_FOUR_OBJECT_ID,
+            BUCKET_ONE_OBJECT_ID,
+            VOL_OBJECT_ID,
             KEY_SIX_SIZE);
   }
 
@@ -582,6 +598,8 @@ public class TestNSSummaryEndpoint {
 
     // add the multi-block key to Recon's OM
     writeKeyToOm(reconOMMetadataManager,
+            VOL_OBJECT_ID,
+            BUCKET_ONE_OBJECT_ID,
             DIR_ONE_OBJECT_ID,
             MULTI_BLOCK_KEY_OBJECT_ID,
             VOL, BUCKET_ONE,
diff --git a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestNSSummaryTask.java b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestNSSummaryTask.java
index 190a09c236..fd4238c631 100644
--- a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestNSSummaryTask.java
+++ b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestNSSummaryTask.java
@@ -81,6 +81,7 @@ public class TestNSSummaryTask {
   private static final String DIR_FOUR = "dir4";
   private static final String DIR_FIVE = "dir5";
 
+  private static final long VOL_OBJECT_ID = 0L;
   private static final long BUCKET_ONE_OBJECT_ID = 1L;
   private static final long BUCKET_TWO_OBJECT_ID = 2L;
   private static final long KEY_ONE_OBJECT_ID = 3L;
@@ -486,6 +487,8 @@ public class TestNSSummaryTask {
             FILE_ONE,
             KEY_ONE_OBJECT_ID,
             BUCKET_ONE_OBJECT_ID,
+            BUCKET_ONE_OBJECT_ID,
+            VOL_OBJECT_ID,
             KEY_ONE_SIZE);
     writeKeyToOm(reconOMMetadataManager,
             KEY_TWO,
@@ -494,6 +497,8 @@ public class TestNSSummaryTask {
             FILE_TWO,
             KEY_TWO_OBJECT_ID,
             BUCKET_TWO_OBJECT_ID,
+            BUCKET_TWO_OBJECT_ID,
+            VOL_OBJECT_ID,
             KEY_TWO_OLD_SIZE);
     writeKeyToOm(reconOMMetadataManager,
             KEY_THREE,
@@ -502,6 +507,8 @@ public class TestNSSummaryTask {
             FILE_THREE,
             KEY_THREE_OBJECT_ID,
             DIR_TWO_OBJECT_ID,
+            BUCKET_ONE_OBJECT_ID,
+            VOL_OBJECT_ID,
             KEY_THREE_SIZE);
     writeKeyToOm(reconOMMetadataManager,
             KEY_FOUR,
@@ -510,13 +517,18 @@ public class TestNSSummaryTask {
             FILE_FOUR,
             KEY_FOUR_OBJECT_ID,
             BUCKET_TWO_OBJECT_ID,
+            BUCKET_TWO_OBJECT_ID,
+            VOL_OBJECT_ID,
             KEY_FOUR_SIZE);
     writeDirToOm(reconOMMetadataManager, DIR_ONE_OBJECT_ID,
-            BUCKET_ONE_OBJECT_ID, DIR_ONE);
+            BUCKET_ONE_OBJECT_ID, BUCKET_ONE_OBJECT_ID,
+            VOL_OBJECT_ID, DIR_ONE);
     writeDirToOm(reconOMMetadataManager, DIR_TWO_OBJECT_ID,
-            DIR_ONE_OBJECT_ID, DIR_TWO);
+            DIR_ONE_OBJECT_ID, BUCKET_ONE_OBJECT_ID,
+            VOL_OBJECT_ID, DIR_TWO);
     writeDirToOm(reconOMMetadataManager, DIR_THREE_OBJECT_ID,
-            DIR_ONE_OBJECT_ID, DIR_THREE);
+            DIR_ONE_OBJECT_ID, BUCKET_ONE_OBJECT_ID,
+            VOL_OBJECT_ID, DIR_THREE);
   }
 
   private BucketLayout getBucketLayout() {
diff --git a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/debug/PrefixParser.java b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/debug/PrefixParser.java
index 0ebc832456..bf3ccbeee7 100644
--- a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/debug/PrefixParser.java
+++ b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/debug/PrefixParser.java
@@ -45,6 +45,8 @@ import picocli.CommandLine;
 import picocli.CommandLine.Model.CommandSpec;
 import picocli.CommandLine.Spec;
 
+import static org.apache.hadoop.ozone.OzoneConsts.OM_KEY_PREFIX;
+
 /**
  * Tool that parses OM db file for prefix table.
  */
@@ -164,6 +166,8 @@ public class PrefixParser implements Callable<Void>, SubcommandWithParent {
       return;
     }
 
+    final long volumeObjectId = metadataManager.getVolumeId(
+            info.getVolumeName());
     long lastObjectId = info.getObjectID();
     WithParentObjectId objectBucketId = new WithParentObjectId();
     objectBucketId.setObjectID(lastObjectId);
@@ -173,7 +177,8 @@ public class PrefixParser implements Callable<Void>, SubcommandWithParent {
     while (pathIterator.hasNext()) {
       Path elem = pathIterator.next();
       String path =
-          metadataManager.getOzonePathKey(lastObjectId, elem.toString());
+          metadataManager.getOzonePathKey(volumeObjectId, info.getObjectID(),
+                  lastObjectId, elem.toString());
       OmDirectoryInfo directoryInfo =
           metadataManager.getDirectoryTable().get(path);
 
@@ -198,10 +203,12 @@ public class PrefixParser implements Callable<Void>, SubcommandWithParent {
 
     // at the last level, now parse both file and dir table
     dumpTableInfo(Types.DIRECTORY, effectivePath,
-        metadataManager.getDirectoryTable(), lastObjectId);
+        metadataManager.getDirectoryTable(),
+            volumeObjectId, info.getObjectID(), lastObjectId);
 
     dumpTableInfo(Types.FILE, effectivePath,
-        metadataManager.getKeyTable(getBucketLayout()), lastObjectId);
+        metadataManager.getKeyTable(getBucketLayout()),
+            volumeObjectId, info.getObjectID(), lastObjectId);
     metadataManager.stop();
   }
 
@@ -211,9 +218,11 @@ public class PrefixParser implements Callable<Void>, SubcommandWithParent {
 
   private void dumpTableInfo(Types type,
       org.apache.hadoop.fs.Path effectivePath,
-      Table<String, ? extends WithParentObjectId> table, long lastObjectId)
+      Table<String, ? extends WithParentObjectId> table,
+      long volumeId, long bucketId, long lastObjectId)
       throws IOException {
-    MetadataKeyFilters.KeyPrefixFilter filter = getPrefixFilter(lastObjectId);
+    MetadataKeyFilters.KeyPrefixFilter filter = getPrefixFilter(
+            volumeId, bucketId, lastObjectId);
 
     List<? extends KeyValue
         <String, ? extends WithParentObjectId>> infoList =
@@ -243,9 +252,13 @@ public class PrefixParser implements Callable<Void>, SubcommandWithParent {
 
   }
 
-  private static MetadataKeyFilters.KeyPrefixFilter getPrefixFilter(long id) {
+  private static MetadataKeyFilters.KeyPrefixFilter getPrefixFilter(
+          long volumeId, long bucketId, long parentId) {
+    String key = OM_KEY_PREFIX + volumeId +
+            OM_KEY_PREFIX + bucketId +
+            OM_KEY_PREFIX + parentId;
     return (new MetadataKeyFilters.KeyPrefixFilter())
-        .addFilter(Long.toString(id));
+        .addFilter(key);
   }
 
   public int getParserStats(Types type) {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org