You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by ra...@apache.org on 2020/11/20 10:15:39 UTC

[ozone] 02/04: HDDS-4266: CreateFile : store parent dir entries into DirTable and file entry into separate FileTable (#1473)

This is an automated email from the ASF dual-hosted git repository.

rakeshr pushed a commit to branch HDDS-2939
in repository https://gitbox.apache.org/repos/asf/ozone.git

commit 4124a34ac96590b67cec84ec4d2f614ed4d74423
Author: Rakesh Radhakrishnan <ra...@apache.org>
AuthorDate: Tue Oct 13 22:48:35 2020 +0530

    HDDS-4266: CreateFile : store parent dir entries into DirTable and file entry into separate FileTable (#1473)
---
 .../apache/hadoop/ozone/om/helpers/OmKeyInfo.java  | 108 +++++++-
 .../apache/hadoop/fs/ozone/TestOzoneFileOps.java   | 231 +++++++++++++++++
 .../src/main/proto/OmClientProtocol.proto          |   1 +
 .../apache/hadoop/ozone/om/OMMetadataManager.java  |  15 +-
 .../hadoop/ozone/om/codec/OmKeyInfoCodec.java      |   8 +
 .../hadoop/ozone/om/OmMetadataManagerImpl.java     |  37 ++-
 .../om/ratis/utils/OzoneManagerRatisUtils.java     |  19 +-
 .../request/file/OMDirectoryCreateRequestV1.java   |  21 +-
 .../ozone/om/request/file/OMFileCreateRequest.java |  53 ++--
 .../om/request/file/OMFileCreateRequestV1.java     | 260 +++++++++++++++++++
 .../ozone/om/request/file/OMFileRequest.java       | 132 +++++++++-
 .../ozone/om/request/key/OMKeyCommitRequest.java   |  34 ++-
 ...ommitRequest.java => OMKeyCommitRequestV1.java} | 242 +++++++++---------
 .../hadoop/ozone/om/request/key/OMKeyRequest.java  | 278 ++++++++++++---------
 .../om/response/file/OMFileCreateResponseV1.java   |  91 +++++++
 .../ozone/om/response/key/OMKeyCommitResponse.java |  19 ++
 ...mitResponse.java => OMKeyCommitResponseV1.java} |  58 ++---
 .../ozone/om/response/key/OMKeyCreateResponse.java |  18 +-
 .../ozone/om/request/TestOMRequestUtils.java       | 110 ++++++++
 .../file/TestOMDirectoryCreateRequestV1.java       |   1 +
 .../om/request/file/TestOMFileCreateRequest.java   |  91 ++++---
 .../om/request/file/TestOMFileCreateRequestV1.java | 192 ++++++++++++++
 .../om/request/key/TestOMKeyCommitRequest.java     |  74 ++++--
 .../om/request/key/TestOMKeyCommitRequestV1.java   | 106 ++++++++
 .../ozone/om/request/key/TestOMKeyRequest.java     |   9 +-
 .../response/file/TestOMFileCreateResponseV1.java  |  74 ++++++
 .../om/response/key/TestOMKeyCommitResponse.java   |  68 +++--
 .../om/response/key/TestOMKeyCommitResponseV1.java | 101 ++++++++
 .../om/response/key/TestOMKeyCreateResponse.java   |  38 +--
 .../ozone/om/response/key/TestOMKeyResponse.java   |  25 +-
 30 files changed, 2106 insertions(+), 408 deletions(-)

diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyInfo.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyInfo.java
index d0e8bee..494e1b9 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyInfo.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyInfo.java
@@ -25,9 +25,11 @@ import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 
+import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.fs.FileEncryptionInfo;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.ozone.OzoneAcl;
+import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.KeyInfo;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.KeyLocationList;
 import org.apache.hadoop.ozone.protocolPB.OMPBHelper;
@@ -52,6 +54,8 @@ public final class OmKeyInfo extends WithObjectID {
   private HddsProtos.ReplicationType type;
   private HddsProtos.ReplicationFactor factor;
   private FileEncryptionInfo encInfo;
+  private String fileName; // leaf node name
+  private long parentObjectID; // pointer to parent directory
 
   /**
    * ACL Information.
@@ -94,6 +98,22 @@ public final class OmKeyInfo extends WithObjectID {
     this.updateID = updateID;
   }
 
+  @SuppressWarnings("parameternumber")
+  OmKeyInfo(String volumeName, String bucketName, String keyName,
+            String fileName, List<OmKeyLocationInfoGroup> versions,
+            long dataSize, long creationTime, long modificationTime,
+            HddsProtos.ReplicationType type,
+            HddsProtos.ReplicationFactor factor,
+            Map<String, String> metadata,
+            FileEncryptionInfo encInfo, List<OzoneAcl> acls,
+            long parentObjectID, long objectID, long updateID) {
+    this(volumeName, bucketName, keyName, versions, dataSize,
+            creationTime, modificationTime, type, factor, metadata, encInfo,
+            acls, objectID, updateID);
+    this.fileName = fileName;
+    this.parentObjectID = parentObjectID;
+  }
+
   public String getVolumeName() {
     return volumeName;
   }
@@ -126,6 +146,19 @@ public final class OmKeyInfo extends WithObjectID {
     this.dataSize = size;
   }
 
+  public void setFileName(String fileName) {
+    this.fileName = fileName;
+  }
+
+  public String getFileName() {
+    return fileName;
+  }
+
+  public long getParentObjectID() {
+    return parentObjectID;
+  }
+
+
   public synchronized OmKeyLocationInfoGroup getLatestVersionLocations() {
     return keyLocationVersions.size() == 0? null :
         keyLocationVersions.get(keyLocationVersions.size() - 1);
@@ -267,6 +300,9 @@ public final class OmKeyInfo extends WithObjectID {
     private List<OzoneAcl> acls;
     private long objectID;
     private long updateID;
+    // not persisted to DB. FileName will be the last element in path keyName.
+    private String fileName;
+    private long parentObjectID;
 
     public Builder() {
       this.metadata = new HashMap<>();
@@ -369,11 +405,22 @@ public final class OmKeyInfo extends WithObjectID {
       return this;
     }
 
+    public Builder setFileName(String keyFileName) {
+      this.fileName = keyFileName;
+      return this;
+    }
+
+    public Builder setParentObjectID(long parentID) {
+      this.parentObjectID = parentID;
+      return this;
+    }
+
     public OmKeyInfo build() {
       return new OmKeyInfo(
-          volumeName, bucketName, keyName, omKeyLocationInfoGroups,
-          dataSize, creationTime, modificationTime, type, factor, metadata,
-          encInfo, acls, objectID, updateID);
+              volumeName, bucketName, keyName, fileName,
+              omKeyLocationInfoGroups, dataSize, creationTime,
+              modificationTime, type, factor, metadata, encInfo, acls,
+              parentObjectID, objectID, updateID);
     }
   }
 
@@ -386,11 +433,32 @@ public final class OmKeyInfo extends WithObjectID {
   }
 
   /**
+   * For network transmit.
+   *
+   * @param fullKeyName the user given full key name
+   * @return key info with the user given full key name
+   */
+  public KeyInfo getProtobuf(String fullKeyName) {
+    return getProtobuf(false, fullKeyName);
+  }
+
+  /**
    *
    * @param ignorePipeline true for persist to DB, false for network transmit.
    * @return
    */
   public KeyInfo getProtobuf(boolean ignorePipeline) {
+    return getProtobuf(ignorePipeline, null);
+  }
+
+  /**
+   * Gets KeyInfo with the user given key name.
+   *
+   * @param ignorePipeline   ignore pipeline flag
+   * @param fullKeyName user given key name
+   * @return key info object
+   */
+  private KeyInfo getProtobuf(boolean ignorePipeline, String fullKeyName) {
     long latestVersion = keyLocationVersions.size() == 0 ? -1 :
         keyLocationVersions.get(keyLocationVersions.size() - 1).getVersion();
 
@@ -402,7 +470,6 @@ public final class OmKeyInfo extends WithObjectID {
     KeyInfo.Builder kb = KeyInfo.newBuilder()
         .setVolumeName(volumeName)
         .setBucketName(bucketName)
-        .setKeyName(keyName)
         .setDataSize(dataSize)
         .setFactor(factor)
         .setType(type)
@@ -413,7 +480,13 @@ public final class OmKeyInfo extends WithObjectID {
         .addAllMetadata(KeyValueUtil.toProtobuf(metadata))
         .addAllAcls(OzoneAclUtil.toProtobuf(acls))
         .setObjectID(objectID)
-        .setUpdateID(updateID);
+        .setUpdateID(updateID)
+        .setParentID(parentObjectID);
+    if (StringUtils.isNotBlank(fullKeyName)) {
+      kb.setKeyName(fullKeyName);
+    } else {
+      kb.setKeyName(keyName);
+    }
     if (encInfo != null) {
       kb.setFileEncryptionInfo(OMPBHelper.convert(encInfo));
     }
@@ -451,6 +524,11 @@ public final class OmKeyInfo extends WithObjectID {
     if (keyInfo.hasUpdateID()) {
       builder.setUpdateID(keyInfo.getUpdateID());
     }
+    if (keyInfo.hasParentID()) {
+      builder.setParentObjectID(keyInfo.getParentID());
+    }
+    // not persisted to DB. FileName will be filtered out from keyName
+    builder.setFileName(OzoneFSUtils.getFileName(keyInfo.getKeyName()));
     return builder.build();
   }
 
@@ -464,6 +542,8 @@ public final class OmKeyInfo extends WithObjectID {
         ", creationTime='" + creationTime + '\'' +
         ", type='" + type + '\'' +
         ", factor='" + factor + '\'' +
+        ", objectID='" + objectID + '\'' +
+        ", parentID='" + parentObjectID + '\'' +
         '}';
   }
 
@@ -489,12 +569,13 @@ public final class OmKeyInfo extends WithObjectID {
         Objects.equals(metadata, omKeyInfo.metadata) &&
         Objects.equals(acls, omKeyInfo.acls) &&
         objectID == omKeyInfo.objectID &&
-        updateID == omKeyInfo.updateID;
+        updateID == omKeyInfo.updateID &&
+        parentObjectID == omKeyInfo.parentObjectID;
   }
 
   @Override
   public int hashCode() {
-    return Objects.hash(volumeName, bucketName, keyName);
+    return Objects.hash(volumeName, bucketName, keyName, parentObjectID);
   }
 
   /**
@@ -511,8 +592,10 @@ public final class OmKeyInfo extends WithObjectID {
         .setReplicationType(type)
         .setReplicationFactor(factor)
         .setFileEncryptionInfo(encInfo)
-        .setObjectID(objectID).setUpdateID(updateID);
-
+        .setObjectID(objectID)
+        .setUpdateID(updateID)
+        .setParentObjectID(parentObjectID)
+        .setFileName(fileName);
 
     keyLocationVersions.forEach(keyLocationVersion ->
         builder.addOmKeyLocationInfoGroup(
@@ -540,4 +623,11 @@ public final class OmKeyInfo extends WithObjectID {
   public void clearFileEncryptionInfo() {
     this.encInfo = null;
   }
+
+  public String getPath() {
+    if (StringUtils.isBlank(getFileName())) {
+      return getKeyName();
+    }
+    return getParentObjectID() + OzoneConsts.OM_KEY_PREFIX + getFileName();
+  }
 }
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneFileOps.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneFileOps.java
new file mode 100644
index 0000000..d097268
--- /dev/null
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneFileOps.java
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.ozone;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.utils.db.Table;
+import org.apache.hadoop.hdds.utils.db.TableIterator;
+import org.apache.hadoop.ozone.MiniOzoneCluster;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.TestDataUtil;
+import org.apache.hadoop.ozone.client.OzoneBucket;
+import org.apache.hadoop.ozone.om.OMConfigKeys;
+import org.apache.hadoop.ozone.om.OMMetadataManager;
+import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
+import org.apache.hadoop.ozone.om.helpers.OmDirectoryInfo;
+import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
+import org.apache.hadoop.util.StringUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.Timeout;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.concurrent.TimeoutException;
+
+import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_TRASH_INTERVAL_KEY;
+import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_FS_ITERATE_BATCH_SIZE;
+
+/**
+ * Test verifies the entries and operations in file table, open file table etc.
+ */
+public class TestOzoneFileOps {
+
+  @Rule
+  public Timeout timeout = new Timeout(300000);
+
+  private static final Logger LOG =
+          LoggerFactory.getLogger(TestOzoneFileOps.class);
+
+  private MiniOzoneCluster cluster;
+  private FileSystem fs;
+  private String volumeName;
+  private String bucketName;
+
+  @Before
+  public void setupOzoneFileSystem()
+          throws IOException, TimeoutException, InterruptedException {
+    OzoneConfiguration conf = new OzoneConfiguration();
+    conf.setInt(FS_TRASH_INTERVAL_KEY, 1);
+    conf.set(OMConfigKeys.OZONE_OM_LAYOUT_VERSION, "V1");
+    conf.setBoolean(OMConfigKeys.OZONE_OM_ENABLE_FILESYSTEM_PATHS, false);
+    cluster = MiniOzoneCluster.newBuilder(conf)
+            .setNumDatanodes(3)
+            .build();
+    cluster.waitForClusterToBeReady();
+    // create a volume and a bucket to be used by OzoneFileSystem
+    OzoneBucket bucket = TestDataUtil.createVolumeAndBucket(cluster);
+    volumeName = bucket.getVolumeName();
+    bucketName = bucket.getName();
+
+    String rootPath = String.format("%s://%s.%s/",
+            OzoneConsts.OZONE_URI_SCHEME, bucket.getName(),
+            bucket.getVolumeName());
+
+    // Set the fs.defaultFS and start the filesystem
+    conf.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, rootPath);
+    // Set the number of keys to be processed during batch operate.
+    conf.setInt(OZONE_FS_ITERATE_BATCH_SIZE, 5);
+    fs = FileSystem.get(conf);
+  }
+
+  @After
+  public void tearDown() {
+    IOUtils.closeQuietly(fs);
+    if (cluster != null) {
+      cluster.shutdown();
+    }
+  }
+
+  @Test(timeout = 300_000)
+  public void testCreateFile() throws Exception {
+    // Op 1. create dir -> /d1/d2/d3/d4/
+    Path parent = new Path("/d1/d2/");
+    Path file = new Path(parent, "file1");
+    FSDataOutputStream outputStream = fs.create(file);
+    String openFileKey = "";
+
+    OMMetadataManager omMgr = cluster.getOzoneManager().getMetadataManager();
+    OmBucketInfo omBucketInfo = omMgr.getBucketTable().get(
+            omMgr.getBucketKey(volumeName, bucketName));
+    Assert.assertNotNull("Failed to find bucketInfo", omBucketInfo);
+
+    ArrayList<String> dirKeys = new ArrayList<>();
+    long d1ObjectID = verifyDirKey(omBucketInfo.getObjectID(), "d1", "/d1",
+            dirKeys, omMgr);
+    long d2ObjectID = verifyDirKey(d1ObjectID, "d2", "/d1/d2", dirKeys,
+            omMgr);
+    openFileKey = d2ObjectID + OzoneConsts.OM_KEY_PREFIX + file.getName();
+
+    // verify entries in directory table
+    TableIterator<String, ? extends
+            Table.KeyValue<String, OmDirectoryInfo>> iterator =
+            omMgr.getDirectoryTable().iterator();
+    iterator.seekToFirst();
+    int count = dirKeys.size();
+    Assert.assertEquals("Unexpected directory table entries!", 2, count);
+    while (iterator.hasNext()) {
+      count--;
+      Table.KeyValue<String, OmDirectoryInfo> value = iterator.next();
+      verifyKeyFormat(value.getKey(), dirKeys);
+    }
+    Assert.assertEquals("Unexpected directory table entries!", 0, count);
+
+    // verify entries in open key table
+    TableIterator<String, ? extends
+            Table.KeyValue<String, OmKeyInfo>> keysItr =
+            omMgr.getOpenKeyTable().iterator();
+    keysItr.seekToFirst();
+
+    while (keysItr.hasNext()) {
+      count++;
+      Table.KeyValue<String, OmKeyInfo> value = keysItr.next();
+      verifyOpenKeyFormat(value.getKey(), openFileKey);
+      verifyOMFileInfoFormat(value.getValue(), file.getName(), d2ObjectID);
+    }
+    Assert.assertEquals("Unexpected file table entries!", 1, count);
+
+    // trigger CommitKeyRequest
+    outputStream.close();
+
+    Assert.assertTrue("Failed to commit the open file:" + openFileKey,
+            omMgr.getOpenKeyTable().isEmpty());
+
+    OmKeyInfo omKeyInfo = omMgr.getKeyTable().get(openFileKey);
+    Assert.assertNotNull("Invalid Key!", omKeyInfo);
+    verifyOMFileInfoFormat(omKeyInfo, file.getName(), d2ObjectID);
+  }
+
+  private void verifyOMFileInfoFormat(OmKeyInfo omKeyInfo, String fileName,
+                                      long parentID) {
+    Assert.assertEquals("Wrong keyName", fileName,
+            omKeyInfo.getKeyName());
+    Assert.assertEquals("Wrong parentID", parentID,
+            omKeyInfo.getParentObjectID());
+    String dbKey = parentID + OzoneConsts.OM_KEY_PREFIX + fileName;
+    Assert.assertEquals("Wrong path format", dbKey,
+            omKeyInfo.getPath());
+  }
+
+  /**
+   * Verify key name format and the DB key existence in the expected dirKeys
+   * list.
+   *
+   * @param key     table keyName
+   * @param dirKeys expected keyName
+   */
+  private void verifyKeyFormat(String key, ArrayList<String> dirKeys) {
+    String[] keyParts = StringUtils.split(key,
+            OzoneConsts.OM_KEY_PREFIX.charAt(0));
+    Assert.assertEquals("Invalid KeyName", 2, keyParts.length);
+    boolean removed = dirKeys.remove(key);
+    Assert.assertTrue("Key:" + key + " doesn't exists in directory table!",
+            removed);
+  }
+
+  /**
+   * Verify key name format and the DB key existence in the expected
+   * openFileKeys list.
+   *
+   * @param key          table keyName
+   * @param openFileKey expected keyName
+   */
+  private void verifyOpenKeyFormat(String key, String openFileKey) {
+    String[] keyParts = StringUtils.split(key,
+            OzoneConsts.OM_KEY_PREFIX.charAt(0));
+    Assert.assertEquals("Invalid KeyName:" + key, 3, keyParts.length);
+    String[] expectedOpenFileParts = StringUtils.split(openFileKey,
+            OzoneConsts.OM_KEY_PREFIX.charAt(0));
+    Assert.assertEquals("ParentId/Key:" + expectedOpenFileParts[0]
+                    + " doesn't exists in openFileTable!",
+            expectedOpenFileParts[0] + OzoneConsts.OM_KEY_PREFIX
+                    + expectedOpenFileParts[1],
+            keyParts[0] + OzoneConsts.OM_KEY_PREFIX + keyParts[1]);
+  }
+
+  long verifyDirKey(long parentId, String dirKey, String absolutePath,
+                    ArrayList<String> dirKeys, OMMetadataManager omMgr)
+          throws Exception {
+    String dbKey = parentId + OzoneConsts.OM_KEY_PREFIX + dirKey;
+    dirKeys.add(dbKey);
+    OmDirectoryInfo dirInfo = omMgr.getDirectoryTable().get(dbKey);
+    Assert.assertNotNull("Failed to find " + absolutePath +
+            " using dbKey: " + dbKey, dirInfo);
+    Assert.assertEquals("Parent Id mismatches", parentId,
+            dirInfo.getParentObjectID());
+    Assert.assertEquals("Mismatches directory name", dirKey,
+            dirInfo.getName());
+    Assert.assertTrue("Mismatches directory creation time param",
+            dirInfo.getCreationTime() > 0);
+    Assert.assertEquals("Mismatches directory modification time param",
+            dirInfo.getCreationTime(), dirInfo.getModificationTime());
+    return dirInfo.getObjectID();
+  }
+
+}
diff --git a/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto b/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto
index abbee63..32b3578 100644
--- a/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto
+++ b/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto
@@ -768,6 +768,7 @@ message KeyInfo {
     repeated OzoneAclInfo acls = 13;
     optional uint64 objectID = 14;
     optional uint64 updateID = 15;
+    optional uint64 parentID = 16;
 }
 
 message DirectoryInfo {
diff --git a/hadoop-ozone/interface-storage/src/main/java/org/apache/hadoop/ozone/om/OMMetadataManager.java b/hadoop-ozone/interface-storage/src/main/java/org/apache/hadoop/ozone/om/OMMetadataManager.java
index cdb59d2..337bcaf 100644
--- a/hadoop-ozone/interface-storage/src/main/java/org/apache/hadoop/ozone/om/OMMetadataManager.java
+++ b/hadoop-ozone/interface-storage/src/main/java/org/apache/hadoop/ozone/om/OMMetadataManager.java
@@ -396,12 +396,23 @@ public interface OMMetadataManager {
   Set<String> listTableNames();
 
   /**
-   * Given a volume, bucket and a key, return the corresponding DB prefixKey
-   * key.
+   * Given parent object id and path component name, return the corresponding
+   * DB 'prefixKey' key.
    *
    * @param parentObjectId - parent object Id
    * @param pathComponentName   - path component name
    * @return DB directory key as String.
    */
   String getOzonePathKey(long parentObjectId, String pathComponentName);
+
+  /**
+   * Returns DB key name of an open file in OM metadata store. Should be
+   * #open# prefix followed by actual leaf node name.
+   *
+   * @param parentObjectId - parent object Id
+   * @param fileName       - file name
+   * @param id             - client id for this open request
+   * @return DB directory key as String.
+   */
+  String getOpenFileName(long parentObjectId, String fileName, long id);
 }
diff --git a/hadoop-ozone/interface-storage/src/main/java/org/apache/hadoop/ozone/om/codec/OmKeyInfoCodec.java b/hadoop-ozone/interface-storage/src/main/java/org/apache/hadoop/ozone/om/codec/OmKeyInfoCodec.java
index a7e1eab..8a28451 100644
--- a/hadoop-ozone/interface-storage/src/main/java/org/apache/hadoop/ozone/om/codec/OmKeyInfoCodec.java
+++ b/hadoop-ozone/interface-storage/src/main/java/org/apache/hadoop/ozone/om/codec/OmKeyInfoCodec.java
@@ -30,6 +30,14 @@ import org.slf4j.LoggerFactory;
 
 /**
  * Codec to encode OmKeyInfo as byte array.
+ *
+ * <p>
+ * If the layout version "ozone.om.layout.version" is V1 and
+ * "ozone.om.enable.filesystem.paths" is TRUE. Then, DB stores only the leaf
+ * node name into the 'keyName' field.
+ * <p>
+ * For example, the user given key path is '/a/b/c/d/e/file1', then in DB
+ * 'keyName' field stores only the leaf node name, which is 'file1'.
  */
 public class OmKeyInfoCodec implements Codec<OmKeyInfo> {
   private static final Logger LOG =
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java
index 0fd8c76..3ce4ba1 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java
@@ -75,6 +75,7 @@ import org.apache.hadoop.ozone.om.lock.OzoneManagerLock;
 import org.apache.hadoop.ozone.om.ratis.OMTransactionInfo;
 import org.apache.hadoop.ozone.storage.proto
     .OzoneManagerStorageProtos.PersistedUserVolumeInfo;
+import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerRatisUtils;
 import org.apache.hadoop.ozone.security.OzoneTokenIdentifier;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -130,6 +131,10 @@ public class OmMetadataManagerImpl implements OMMetadataManager {
    * |----------------------------------------------------------------------|
    * |  directoryTable    | parentId/directoryName -> DirectoryInfo         |
    * |----------------------------------------------------------------------|
+   * |  fileTable         | parentId/fileName -> KeyInfo                |
+   * |----------------------------------------------------------------------|
+   * |  openFileTable     | parentId/fileName/id -> KeyInfo                   |
+   * |----------------------------------------------------------------------|
    * |  transactionInfoTable | #TRANSACTIONINFO -> OMTransactionInfo        |
    * |----------------------------------------------------------------------|
    */
@@ -145,6 +150,8 @@ public class OmMetadataManagerImpl implements OMMetadataManager {
   public static final String DELEGATION_TOKEN_TABLE = "dTokenTable";
   public static final String PREFIX_TABLE = "prefixTable";
   public static final String DIRECTORY_TABLE = "directoryTable";
+  public static final String FILE_TABLE = "fileTable";
+  public static final String OPEN_FILE_TABLE = "openFileTable";
   public static final String TRANSACTION_INFO_TABLE =
       "transactionInfoTable";
 
@@ -164,6 +171,8 @@ public class OmMetadataManagerImpl implements OMMetadataManager {
   private Table dTokenTable;
   private Table prefixTable;
   private Table dirTable;
+  private Table fileTable;
+  private Table openFileTable;
   private Table transactionInfoTable;
   private boolean isRatisEnabled;
   private boolean ignorePipelineinKey;
@@ -202,7 +211,8 @@ public class OmMetadataManagerImpl implements OMMetadataManager {
    * For subclass overriding.
    */
   protected OmMetadataManagerImpl() {
-    this.lock = new OzoneManagerLock(new OzoneConfiguration());
+    OzoneConfiguration conf = new OzoneConfiguration();
+    this.lock = new OzoneManagerLock(conf);
     this.openKeyExpireThresholdMS =
         OZONE_OPEN_KEY_EXPIRE_THRESHOLD_SECONDS_DEFAULT;
     this.omEpoch = 0;
@@ -229,6 +239,9 @@ public class OmMetadataManagerImpl implements OMMetadataManager {
 
   @Override
   public Table<String, OmKeyInfo> getKeyTable() {
+    if (OzoneManagerRatisUtils.isOmLayoutVersionV1()) {
+      return fileTable;
+    }
     return keyTable;
   }
 
@@ -239,6 +252,9 @@ public class OmMetadataManagerImpl implements OMMetadataManager {
 
   @Override
   public Table<String, OmKeyInfo> getOpenKeyTable() {
+    if (OzoneManagerRatisUtils.isOmLayoutVersionV1()) {
+      return openFileTable;
+    }
     return openKeyTable;
   }
 
@@ -345,6 +361,8 @@ public class OmMetadataManagerImpl implements OMMetadataManager {
         .addTable(S3_SECRET_TABLE)
         .addTable(PREFIX_TABLE)
         .addTable(DIRECTORY_TABLE)
+        .addTable(FILE_TABLE)
+        .addTable(OPEN_FILE_TABLE)
         .addTable(TRANSACTION_INFO_TABLE)
         .addCodec(OzoneTokenIdentifier.class, new TokenIdentifierCodec())
         .addCodec(OmKeyInfo.class, new OmKeyInfoCodec(true))
@@ -416,6 +434,14 @@ public class OmMetadataManagerImpl implements OMMetadataManager {
             OmDirectoryInfo.class);
     checkTableStatus(dirTable, DIRECTORY_TABLE);
 
+    fileTable = this.store.getTable(FILE_TABLE, String.class,
+            OmKeyInfo.class);
+    checkTableStatus(fileTable, FILE_TABLE);
+
+    openFileTable = this.store.getTable(OPEN_FILE_TABLE, String.class,
+            OmKeyInfo.class);
+    checkTableStatus(openFileTable, OPEN_FILE_TABLE);
+
     transactionInfoTable = this.store.getTable(TRANSACTION_INFO_TABLE,
         String.class, OMTransactionInfo.class);
     checkTableStatus(transactionInfoTable, TRANSACTION_INFO_TABLE);
@@ -1176,4 +1202,13 @@ public class OmMetadataManagerImpl implements OMMetadataManager {
     return builder.toString();
   }
 
+  @Override
+  public String getOpenFileName(long parentID, String fileName,
+                                long id) {
+    StringBuilder openKey = new StringBuilder();
+    openKey.append(parentID);
+    openKey.append(OM_KEY_PREFIX).append(fileName);
+    openKey.append(OM_KEY_PREFIX).append(id);
+    return openKey.toString();
+  }
 }
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/utils/OzoneManagerRatisUtils.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/utils/OzoneManagerRatisUtils.java
index 1ea225b..d4c0f17 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/utils/OzoneManagerRatisUtils.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/utils/OzoneManagerRatisUtils.java
@@ -35,9 +35,11 @@ import org.apache.hadoop.ozone.om.request.bucket.acl.OMBucketSetAclRequest;
 import org.apache.hadoop.ozone.om.request.file.OMDirectoryCreateRequest;
 import org.apache.hadoop.ozone.om.request.file.OMDirectoryCreateRequestV1;
 import org.apache.hadoop.ozone.om.request.file.OMFileCreateRequest;
+import org.apache.hadoop.ozone.om.request.file.OMFileCreateRequestV1;
 import org.apache.hadoop.ozone.om.request.key.OMKeysDeleteRequest;
 import org.apache.hadoop.ozone.om.request.key.OMAllocateBlockRequest;
 import org.apache.hadoop.ozone.om.request.key.OMKeyCommitRequest;
+import org.apache.hadoop.ozone.om.request.key.OMKeyCommitRequestV1;
 import org.apache.hadoop.ozone.om.request.key.OMKeyCreateRequest;
 import org.apache.hadoop.ozone.om.request.key.OMKeyDeleteRequest;
 import org.apache.hadoop.ozone.om.request.key.OMKeyPurgeRequest;
@@ -86,7 +88,7 @@ public final class OzoneManagerRatisUtils {
 
   // TODO: Temporary workaround for OM upgrade path and will be replaced once
   //  upgrade HDDS-3698 story reaches consensus.
-  private static boolean omLayoutVersionV1 = true;
+  private static boolean omLayoutVersionV1 = false;
 
   private OzoneManagerRatisUtils() {
   }
@@ -138,6 +140,9 @@ public final class OzoneManagerRatisUtils {
     case CreateKey:
       return new OMKeyCreateRequest(omRequest);
     case CommitKey:
+      if (omLayoutVersionV1) {
+        return new OMKeyCommitRequestV1(omRequest);
+      }
       return new OMKeyCommitRequest(omRequest);
     case DeleteKey:
       return new OMKeyDeleteRequest(omRequest);
@@ -153,6 +158,9 @@ public final class OzoneManagerRatisUtils {
       }
       return new OMDirectoryCreateRequest(omRequest);
     case CreateFile:
+      if (omLayoutVersionV1) {
+        return new OMFileCreateRequestV1(omRequest);
+      }
       return new OMFileCreateRequest(omRequest);
     case PurgeKeys:
       return new OMKeyPurgeRequest(omRequest);
@@ -326,4 +334,13 @@ public final class OzoneManagerRatisUtils {
 
     return true;
   }
+
+  /**
+   * Returns layout version flag represents V1.
+   * @return
+   */
+  public static boolean isOmLayoutVersionV1() {
+    return omLayoutVersionV1;
+  }
+
 }
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/file/OMDirectoryCreateRequestV1.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/file/OMDirectoryCreateRequestV1.java
index 8b0727a..c48ff78 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/file/OMDirectoryCreateRequestV1.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/file/OMDirectoryCreateRequestV1.java
@@ -147,8 +147,10 @@ public class OMDirectoryCreateRequestV1 extends OMDirectoryCreateRequest {
           omDirectoryResult == NONE) {
 
         // prepare all missing parents
-        missingParentInfos = OMDirectoryCreateRequestV1.getAllParentDirInfo(
-                ozoneManager, keyArgs, omPathInfo, trxnLogIndex);
+        missingParentInfos =
+                OMDirectoryCreateRequestV1.getAllMissingParentDirInfo(
+                        ozoneManager, keyArgs, omPathInfo, trxnLogIndex);
+
         // prepare leafNode dir
         OmDirectoryInfo dirInfo = createDirectoryInfoWithACL(
                 omPathInfo.getLeafNodeName(),
@@ -229,14 +231,15 @@ public class OMDirectoryCreateRequestV1 extends OMDirectoryCreateRequest {
 
   /**
    * Construct OmDirectoryInfo for every parent directory in missing list.
-   * @param ozoneManager
-   * @param keyArgs
-   * @param pathInfo list of parent directories to be created and its ACLs
-   * @param trxnLogIndex
-   * @return
-   * @throws IOException
+   *
+   * @param ozoneManager Ozone Manager
+   * @param keyArgs      key arguments
+   * @param pathInfo     list of parent directories to be created and its ACLs
+   * @param trxnLogIndex transaction log index id
+   * @return list of missing parent directories
+   * @throws IOException DB failure
    */
-  public static List<OmDirectoryInfo> getAllParentDirInfo(
+  public static List<OmDirectoryInfo> getAllMissingParentDirInfo(
           OzoneManager ozoneManager, KeyArgs keyArgs,
           OMFileRequest.OMPathInfoV1 pathInfo, long trxnLogIndex)
           throws IOException {
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/file/OMFileCreateRequest.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/file/OMFileCreateRequest.java
index 6ca3cc3..e5cdd3e 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/file/OMFileCreateRequest.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/file/OMFileCreateRequest.java
@@ -234,23 +234,10 @@ public class OMFileCreateRequest extends OMKeyRequest {
       List<OzoneAcl> inheritAcls = pathInfo.getAcls();
 
       // Check if a file or directory exists with same key name.
-      if (omDirectoryResult == FILE_EXISTS) {
-        if (!isOverWrite) {
-          throw new OMException("File " + keyName + " already exists",
-              OMException.ResultCodes.FILE_ALREADY_EXISTS);
-        }
-      } else if (omDirectoryResult == DIRECTORY_EXISTS) {
-        throw new OMException("Can not write to directory: " + keyName,
-            OMException.ResultCodes.NOT_A_FILE);
-      } else if (omDirectoryResult == FILE_EXISTS_IN_GIVENPATH) {
-        throw new OMException(
-            "Can not create file: " + keyName + " as there " +
-                "is already file in the given path",
-            OMException.ResultCodes.NOT_A_FILE);
-      }
+      checkDirectoryResult(keyName, isOverWrite, omDirectoryResult);
 
       if (!isRecursive) {
-        checkAllParentsExist(ozoneManager, keyArgs, pathInfo);
+        checkAllParentsExist(keyArgs, pathInfo);
       }
 
       // do open key
@@ -352,8 +339,40 @@ public class OMFileCreateRequest extends OMKeyRequest {
     return omClientResponse;
   }
 
-  private void checkAllParentsExist(OzoneManager ozoneManager,
-      KeyArgs keyArgs,
+  /**
+   * Verify om directory result.
+   *
+   * @param keyName           key name
+   * @param isOverWrite       flag represents whether file can be overwritten
+   * @param omDirectoryResult directory result
+   * @throws OMException if file or directory or file exists in the given path
+   */
+  protected void checkDirectoryResult(String keyName, boolean isOverWrite,
+      OMFileRequest.OMDirectoryResult omDirectoryResult) throws OMException {
+    if (omDirectoryResult == FILE_EXISTS) {
+      if (!isOverWrite) {
+        throw new OMException("File " + keyName + " already exists",
+            OMException.ResultCodes.FILE_ALREADY_EXISTS);
+      }
+    } else if (omDirectoryResult == DIRECTORY_EXISTS) {
+      throw new OMException("Can not write to directory: " + keyName,
+          OMException.ResultCodes.NOT_A_FILE);
+    } else if (omDirectoryResult == FILE_EXISTS_IN_GIVENPATH) {
+      throw new OMException(
+          "Can not create file: " + keyName + " as there " +
+              "is already file in the given path",
+          OMException.ResultCodes.NOT_A_FILE);
+    }
+  }
+
+  /**
+   * Verify the existence of parent directory.
+   *
+   * @param keyArgs  key arguments
+   * @param pathInfo om path info
+   * @throws IOException directory not found
+   */
+  protected void checkAllParentsExist(KeyArgs keyArgs,
       OMFileRequest.OMPathInfo pathInfo) throws IOException {
     String keyName = keyArgs.getKeyName();
 
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/file/OMFileCreateRequestV1.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/file/OMFileCreateRequestV1.java
new file mode 100644
index 0000000..d8452b3
--- /dev/null
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/file/OMFileCreateRequestV1.java
@@ -0,0 +1,260 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.om.request.file;
+
+import com.google.common.base.Optional;
+import org.apache.hadoop.ozone.audit.OMAction;
+import org.apache.hadoop.ozone.om.OMMetadataManager;
+import org.apache.hadoop.ozone.om.OMMetrics;
+import org.apache.hadoop.ozone.om.OzoneManager;
+import org.apache.hadoop.ozone.om.exceptions.OMException;
+import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
+import org.apache.hadoop.ozone.om.helpers.OmDirectoryInfo;
+import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
+import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
+import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
+import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerDoubleBufferHelper;
+import org.apache.hadoop.ozone.om.request.util.OmResponseUtil;
+import org.apache.hadoop.ozone.om.response.OMClientResponse;
+import org.apache.hadoop.ozone.om.response.file.OMFileCreateResponse;
+import org.apache.hadoop.ozone.om.response.file.OMFileCreateResponseV1;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.CreateFileRequest;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.CreateFileResponse;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.KeyArgs;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type;
+import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer;
+import org.apache.hadoop.ozone.security.acl.OzoneObj;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static org.apache.hadoop.ozone.om.lock.OzoneManagerLock.Resource.BUCKET_LOCK;
+
+/**
+ * Handles create file request layout version1.
+ */
+public class OMFileCreateRequestV1 extends OMFileCreateRequest {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(OMFileCreateRequestV1.class);
+  public OMFileCreateRequestV1(OMRequest omRequest) {
+    super(omRequest);
+  }
+
+  @Override
+  @SuppressWarnings("methodlength")
+  public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager,
+      long trxnLogIndex, OzoneManagerDoubleBufferHelper omDoubleBufferHelper) {
+
+    CreateFileRequest createFileRequest = getOmRequest().getCreateFileRequest();
+    KeyArgs keyArgs = createFileRequest.getKeyArgs();
+    Map<String, String> auditMap = buildKeyArgsAuditMap(keyArgs);
+
+    String volumeName = keyArgs.getVolumeName();
+    String bucketName = keyArgs.getBucketName();
+    String keyName = keyArgs.getKeyName();
+
+    // if isRecursive is true, file would be created even if parent
+    // directories does not exist.
+    boolean isRecursive = createFileRequest.getIsRecursive();
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("File create for : " + volumeName + "/" + bucketName + "/"
+          + keyName + ":" + isRecursive);
+    }
+
+    // if isOverWrite is true, file would be over written.
+    boolean isOverWrite = createFileRequest.getIsOverwrite();
+
+    OMMetrics omMetrics = ozoneManager.getMetrics();
+    omMetrics.incNumCreateFile();
+
+    OMMetadataManager omMetadataManager = ozoneManager.getMetadataManager();
+
+    boolean acquiredLock = false;
+
+    OmVolumeArgs omVolumeArgs = null;
+    OmBucketInfo omBucketInfo = null;
+    final List<OmKeyLocationInfo> locations = new ArrayList<>();
+    List<OmDirectoryInfo> missingParentInfos;
+    int numKeysCreated = 0;
+
+    OMClientResponse omClientResponse = null;
+    OMResponse.Builder omResponse = OmResponseUtil.getOMResponseBuilder(
+        getOmRequest());
+    IOException exception = null;
+    Result result = null;
+    try {
+      keyArgs = resolveBucketLink(ozoneManager, keyArgs, auditMap);
+      volumeName = keyArgs.getVolumeName();
+      bucketName = keyArgs.getBucketName();
+
+      if (keyName.length() == 0) {
+        // Check if this is the root of the filesystem.
+        throw new OMException("Can not write to directory: " + keyName,
+                OMException.ResultCodes.NOT_A_FILE);
+      }
+
+      // check Acl
+      checkKeyAcls(ozoneManager, volumeName, bucketName, keyName,
+          IAccessAuthorizer.ACLType.CREATE, OzoneObj.ResourceType.KEY);
+
+      // acquire lock
+      acquiredLock = omMetadataManager.getLock().acquireWriteLock(BUCKET_LOCK,
+          volumeName, bucketName);
+
+      validateBucketAndVolume(omMetadataManager, volumeName, bucketName);
+
+      OmKeyInfo dbFileInfo = null;
+
+      OMFileRequest.OMPathInfoV1 pathInfoV1 =
+              OMFileRequest.verifyDirectoryKeysInPath(omMetadataManager,
+                      volumeName, bucketName, keyName, Paths.get(keyName));
+
+      if (pathInfoV1.getDirectoryResult()
+              == OMFileRequest.OMDirectoryResult.FILE_EXISTS) {
+        String dbFileKey = omMetadataManager.getOzonePathKey(
+                pathInfoV1.getLastKnownParentId(),
+                pathInfoV1.getLeafNodeName());
+        dbFileInfo = OMFileRequest.getOmKeyInfoFromFileTable(false,
+                omMetadataManager, dbFileKey, keyName);
+      }
+
+      // check if the file or directory already existed in OM
+      checkDirectoryResult(keyName, isOverWrite,
+              pathInfoV1.getDirectoryResult());
+
+      if (!isRecursive) {
+        checkAllParentsExist(keyArgs, pathInfoV1);
+      }
+
+      // add all missing parents to dir table
+      missingParentInfos =
+              OMDirectoryCreateRequestV1.getAllMissingParentDirInfo(
+                      ozoneManager, keyArgs, pathInfoV1, trxnLogIndex);
+
+      // total number of keys created.
+      numKeysCreated = missingParentInfos.size();
+
+      // do open key
+      OmBucketInfo bucketInfo = omMetadataManager.getBucketTable().get(
+          omMetadataManager.getBucketKey(volumeName, bucketName));
+
+      OmKeyInfo omFileInfo = prepareFileInfo(omMetadataManager, keyArgs,
+              dbFileInfo, keyArgs.getDataSize(), locations,
+              getFileEncryptionInfo(keyArgs), ozoneManager.getPrefixManager(),
+              bucketInfo, pathInfoV1, trxnLogIndex,
+              pathInfoV1.getLeafNodeObjectId(),
+              ozoneManager.isRatisEnabled());
+
+      long openVersion = omFileInfo.getLatestVersionLocations().getVersion();
+      long clientID = createFileRequest.getClientID();
+      String dbOpenFileName = omMetadataManager.getOpenFileName(
+              pathInfoV1.getLastKnownParentId(), pathInfoV1.getLeafNodeName(),
+              clientID);
+
+      // Append new blocks
+      List<OmKeyLocationInfo> newLocationList = keyArgs.getKeyLocationsList()
+          .stream().map(OmKeyLocationInfo::getFromProtobuf)
+          .collect(Collectors.toList());
+      omFileInfo.appendNewBlocks(newLocationList, false);
+
+      omVolumeArgs = getVolumeInfo(omMetadataManager, volumeName);
+      omBucketInfo = getBucketInfo(omMetadataManager, volumeName, bucketName);
+      // check volume quota
+      long preAllocatedSpace = newLocationList.size()
+          * ozoneManager.getScmBlockSize()
+          * omFileInfo.getFactor().getNumber();
+      checkVolumeQuotaInBytes(omVolumeArgs, preAllocatedSpace);
+
+      // Add to cache entry can be done outside of lock for this openKey.
+      // Even if bucket gets deleted, when commitKey we shall identify if
+      // bucket gets deleted.
+      OMFileRequest.addOpenFileTableCacheEntry(omMetadataManager,
+              dbOpenFileName, omFileInfo, pathInfoV1.getLeafNodeName(),
+              trxnLogIndex);
+
+      // Add cache entries for the prefix directories.
+      // Skip adding for the file key itself, until Key Commit.
+      OMFileRequest.addDirectoryTableCacheEntries(omMetadataManager,
+              Optional.absent(), Optional.of(missingParentInfos),
+              trxnLogIndex);
+
+      // update usedBytes atomically.
+      omVolumeArgs.getUsedBytes().add(preAllocatedSpace);
+      omBucketInfo.getUsedBytes().add(preAllocatedSpace);
+
+      // Prepare response. Sets user given full key name in the 'keyName'
+      // attribute in response object.
+      omResponse.setCreateFileResponse(CreateFileResponse.newBuilder()
+          .setKeyInfo(omFileInfo.getProtobuf(keyName))
+          .setID(clientID)
+          .setOpenVersion(openVersion).build())
+          .setCmdType(Type.CreateFile);
+      omClientResponse = new OMFileCreateResponseV1(omResponse.build(),
+              omFileInfo, missingParentInfos, clientID, omVolumeArgs,
+              omBucketInfo);
+
+      result = Result.SUCCESS;
+    } catch (IOException ex) {
+      result = Result.FAILURE;
+      exception = ex;
+      omMetrics.incNumCreateFileFails();
+      omResponse.setCmdType(Type.CreateFile);
+      omClientResponse = new OMFileCreateResponse(createErrorOMResponse(
+            omResponse, exception));
+    } finally {
+      addResponseToDoubleBuffer(trxnLogIndex, omClientResponse,
+          omDoubleBufferHelper);
+      if (acquiredLock) {
+        omMetadataManager.getLock().releaseWriteLock(BUCKET_LOCK, volumeName,
+            bucketName);
+      }
+    }
+
+    // Audit Log outside the lock
+    auditLog(ozoneManager.getAuditLogger(), buildAuditMessage(
+        OMAction.CREATE_FILE, auditMap, exception,
+        getOmRequest().getUserInfo()));
+
+    switch (result) {
+    case SUCCESS:
+      omMetrics.incNumKeys(numKeysCreated);
+      LOG.debug("File created. Volume:{}, Bucket:{}, Key:{}", volumeName,
+          bucketName, keyName);
+      break;
+    case FAILURE:
+      LOG.error("File create failed. Volume:{}, Bucket:{}, Key{}.",
+          volumeName, bucketName, keyName, exception);
+      break;
+    default:
+      LOG.error("Unrecognized Result for OMFileCreateRequest: {}",
+          createFileRequest);
+    }
+
+    return omClientResponse;
+  }
+}
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/file/OMFileRequest.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/file/OMFileRequest.java
index d5543ba..ce8d49b 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/file/OMFileRequest.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/file/OMFileRequest.java
@@ -25,6 +25,7 @@ import java.util.Iterator;
 import java.util.List;
 
 import com.google.common.base.Optional;
+import org.apache.hadoop.hdds.utils.db.BatchOperation;
 import org.apache.hadoop.hdds.utils.db.cache.CacheKey;
 import org.apache.hadoop.hdds.utils.db.cache.CacheValue;
 import org.apache.hadoop.ozone.OzoneAcl;
@@ -34,6 +35,7 @@ import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
 import org.apache.hadoop.ozone.om.helpers.OmDirectoryInfo;
 import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
 import org.apache.hadoop.ozone.om.helpers.OzoneFSUtils;
+import org.jetbrains.annotations.Nullable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -396,7 +398,7 @@ public final class OMFileRequest {
   /**
    * Adding directory info to the Table cache.
    *
-   * @param omMetadataManager  OM Metdata Manager
+   * @param omMetadataManager  OM Metadata Manager
    * @param dirInfo            directory info
    * @param missingParentInfos list of the parents to be added to DB
    * @param trxnLogIndex       transaction log index
@@ -422,4 +424,132 @@ public final class OMFileRequest {
     }
   }
 
+  /**
+   * Adding Key info to the openFile Table cache.
+   *
+   * @param omMetadataManager OM Metadata Manager
+   * @param dbOpenFileName    open file name key
+   * @param omFileInfo        key info
+   * @param fileName          file name
+   * @param trxnLogIndex      transaction log index
+   * @return dbOmFileInfo, which keeps leaf node name in keyName field
+   */
+  public static void addOpenFileTableCacheEntry(
+          OMMetadataManager omMetadataManager, String dbOpenFileName,
+          @Nullable OmKeyInfo omFileInfo, String fileName, long trxnLogIndex) {
+
+    Optional<OmKeyInfo> keyInfoOptional = Optional.absent();
+    if (omFileInfo != null) {
+      // New key format for the openFileTable.
+      // For example, the user given key path is '/a/b/c/d/e/file1', then in DB
+      // keyName field stores only the leaf node name, which is 'file1'.
+      omFileInfo.setKeyName(fileName);
+      keyInfoOptional = Optional.of(omFileInfo);
+    }
+
+    omMetadataManager.getOpenKeyTable().addCacheEntry(
+            new CacheKey<>(dbOpenFileName),
+            new CacheValue<>(keyInfoOptional, trxnLogIndex));
+  }
+
+  /**
+   * Adding Key info to the file table cache.
+   *
+   * @param omMetadataManager OM Metadata Manager
+   * @param dbFileKey         file name key
+   * @param omFileInfo        key info
+   * @param fileName          file name
+   * @param trxnLogIndex      transaction log index
+   * @return dbOmFileInfo, which keeps leaf node name in keyName field
+   */
+  public static void addFileTableCacheEntry(
+          OMMetadataManager omMetadataManager, String dbFileKey,
+          OmKeyInfo omFileInfo, String fileName, long trxnLogIndex) {
+
+    // New key format for the fileTable.
+    // For example, the user given key path is '/a/b/c/d/e/file1', then in DB
+    // keyName field stores only the leaf node name, which is 'file1'.
+    omFileInfo.setKeyName(fileName);
+
+    omMetadataManager.getKeyTable().addCacheEntry(
+            new CacheKey<>(dbFileKey),
+            new CacheValue<>(Optional.of(omFileInfo), trxnLogIndex));
+  }
+
+  /**
+   * Adding omKeyInfo to open file table.
+   *
+   * @param omMetadataMgr    OM Metadata Manager
+   * @param batchOp          batch of db operations
+   * @param omFileInfo       omKeyInfo
+   * @param openKeySessionID clientID
+   * @throws IOException DB failure
+   */
+  public static void addToOpenFileTable(OMMetadataManager omMetadataMgr,
+                                        BatchOperation batchOp,
+                                        OmKeyInfo omFileInfo,
+                                        long openKeySessionID)
+          throws IOException {
+
+    String dbOpenFileKey = omMetadataMgr.getOpenFileName(
+            omFileInfo.getParentObjectID(), omFileInfo.getFileName(),
+            openKeySessionID);
+
+    omMetadataMgr.getOpenKeyTable().putWithBatch(batchOp, dbOpenFileKey,
+            omFileInfo);
+  }
+
+  /**
+   * Adding omKeyInfo to file table.
+   *
+   * @param omMetadataMgr
+   * @param batchOp
+   * @param omFileInfo
+   * @throws IOException
+   */
+  public static void addToFileTable(OMMetadataManager omMetadataMgr,
+                                    BatchOperation batchOp,
+                                    OmKeyInfo omFileInfo)
+          throws IOException {
+
+    String dbFileKey = omMetadataMgr.getOzonePathKey(
+            omFileInfo.getParentObjectID(), omFileInfo.getFileName());
+
+    omMetadataMgr.getKeyTable().putWithBatch(batchOp,
+            dbFileKey, omFileInfo);
+  }
+
+  /**
+   * Gets om key info from open key table if openFileTable flag is true,
+   * otherwise get it from key table.
+   *
+   * @param openFileTable if true add KeyInfo to openFileTable, otherwise to
+   *                      fileTable
+   * @param omMetadataMgr OM Metadata Manager
+   * @param dbOpenFileKey open file kaye name in DB
+   * @param keyName       key name
+   * @return om key info
+   * @throws IOException DB failure
+   */
+  public static OmKeyInfo getOmKeyInfoFromFileTable(boolean openFileTable,
+      OMMetadataManager omMetadataMgr, String dbOpenFileKey, String keyName)
+          throws IOException {
+
+    OmKeyInfo dbOmKeyInfo;
+    if (openFileTable) {
+      dbOmKeyInfo = omMetadataMgr.getOpenKeyTable().get(dbOpenFileKey);
+    } else {
+      dbOmKeyInfo = omMetadataMgr.getKeyTable().get(dbOpenFileKey);
+    }
+
+    // DB OMKeyInfo will store only fileName into keyName field. This
+    // function is to set user given keyName into the OmKeyInfo object.
+    // For example, the user given key path is '/a/b/c/d/e/file1', then in DB
+    // keyName field stores only the leaf node name, which is 'file1'.
+    if (dbOmKeyInfo != null) {
+      dbOmKeyInfo.setKeyName(keyName);
+    }
+    return dbOmKeyInfo;
+  }
+
 }
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCommitRequest.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCommitRequest.java
index b1d47de..32ecc2c 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCommitRequest.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCommitRequest.java
@@ -228,6 +228,30 @@ public class OMKeyCommitRequest extends OMKeyRequest {
     auditLog(auditLogger, buildAuditMessage(OMAction.COMMIT_KEY, auditMap,
           exception, getOmRequest().getUserInfo()));
 
+    processResult(commitKeyRequest, volumeName, bucketName, keyName, omMetrics,
+            exception, omKeyInfo, result);
+
+    return omClientResponse;
+  }
+
+  /**
+   * Process result of om request execution.
+   *
+   * @param commitKeyRequest commit key request
+   * @param volumeName       volume name
+   * @param bucketName       bucket name
+   * @param keyName          key name
+   * @param omMetrics        om metrics
+   * @param exception        exception trace
+   * @param omKeyInfo        omKeyInfo
+   * @param result           stores the result of the execution
+   */
+  @SuppressWarnings("parameternumber")
+  protected void processResult(CommitKeyRequest commitKeyRequest,
+                               String volumeName, String bucketName,
+                               String keyName, OMMetrics omMetrics,
+                               IOException exception, OmKeyInfo omKeyInfo,
+                               Result result) {
     switch (result) {
     case SUCCESS:
       // As when we commit the key, then it is visible in ozone, so we should
@@ -239,18 +263,16 @@ public class OMKeyCommitRequest extends OMKeyRequest {
         omMetrics.incNumKeys();
       }
       LOG.debug("Key committed. Volume:{}, Bucket:{}, Key:{}", volumeName,
-          bucketName, keyName);
+              bucketName, keyName);
       break;
     case FAILURE:
-      LOG.error("Key commit failed. Volume:{}, Bucket:{}, Key:{}.",
-          volumeName, bucketName, keyName, exception);
+      LOG.error("Key commit failed. Volume:{}, Bucket:{}, Key:{}. Exception:{}",
+              volumeName, bucketName, keyName, exception);
       omMetrics.incNumKeyCommitFails();
       break;
     default:
       LOG.error("Unrecognized Result for OMKeyCommitRequest: {}",
-          commitKeyRequest);
+              commitKeyRequest);
     }
-
-    return omClientResponse;
   }
 }
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCommitRequest.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCommitRequestV1.java
similarity index 54%
copy from hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCommitRequest.java
copy to hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCommitRequestV1.java
index b1d47de..e985a9a 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCommitRequest.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCommitRequestV1.java
@@ -18,87 +18,57 @@
 
 package org.apache.hadoop.ozone.om.request.key;
 
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
-import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.hadoop.ozone.OmUtils;
-import org.apache.hadoop.ozone.OzoneConsts;
-import org.apache.hadoop.ozone.om.OMConfigKeys;
-import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
-import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
-import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerDoubleBufferHelper;
-import org.apache.hadoop.ozone.om.request.util.OmResponseUtil;
-import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import org.apache.hadoop.ozone.audit.AuditLogger;
 import org.apache.hadoop.ozone.audit.OMAction;
 import org.apache.hadoop.ozone.om.OMMetadataManager;
 import org.apache.hadoop.ozone.om.OMMetrics;
 import org.apache.hadoop.ozone.om.OzoneManager;
 import org.apache.hadoop.ozone.om.exceptions.OMException;
+import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
+import org.apache.hadoop.ozone.om.helpers.OmDirectoryInfo;
+import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
 import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
 import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
+import org.apache.hadoop.ozone.om.helpers.OzoneFSUtils;
+import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerDoubleBufferHelper;
+import org.apache.hadoop.ozone.om.request.file.OMFileRequest;
+import org.apache.hadoop.ozone.om.request.util.OmResponseUtil;
 import org.apache.hadoop.ozone.om.response.OMClientResponse;
-import org.apache.hadoop.ozone.om.response.key.OMKeyCommitResponse;
+import org.apache.hadoop.ozone.om.response.key.OMKeyCommitResponseV1;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.CommitKeyRequest;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.KeyArgs;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.KeyLocation;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse;
-import org.apache.hadoop.util.Time;
-import org.apache.hadoop.hdds.utils.db.cache.CacheKey;
-import org.apache.hadoop.hdds.utils.db.cache.CacheValue;
+import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
 
 import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.KEY_NOT_FOUND;
 import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.NOT_A_FILE;
 import static org.apache.hadoop.ozone.om.lock.OzoneManagerLock.Resource.BUCKET_LOCK;
 
 /**
- * Handles CommitKey request.
+ * Handles CommitKey request layout version V1.
  */
-public class OMKeyCommitRequest extends OMKeyRequest {
+public class OMKeyCommitRequestV1 extends OMKeyCommitRequest {
 
   private static final Logger LOG =
-      LoggerFactory.getLogger(OMKeyCommitRequest.class);
+          LoggerFactory.getLogger(OMKeyCommitRequestV1.class);
 
-  public OMKeyCommitRequest(OMRequest omRequest) {
+  public OMKeyCommitRequestV1(OMRequest omRequest) {
     super(omRequest);
   }
 
   @Override
-  public OMRequest preExecute(OzoneManager ozoneManager) throws IOException {
-    CommitKeyRequest commitKeyRequest = getOmRequest().getCommitKeyRequest();
-    Preconditions.checkNotNull(commitKeyRequest);
-
-    KeyArgs keyArgs = commitKeyRequest.getKeyArgs();
-
-    // Verify key name
-    final boolean checkKeyNameEnabled = ozoneManager.getConfiguration()
-         .getBoolean(OMConfigKeys.OZONE_OM_KEYNAME_CHARACTER_CHECK_ENABLED_KEY,
-                 OMConfigKeys.OZONE_OM_KEYNAME_CHARACTER_CHECK_ENABLED_DEFAULT);
-    if(checkKeyNameEnabled){
-      OmUtils.validateKeyName(StringUtils.removeEnd(keyArgs.getKeyName(),
-              OzoneConsts.FS_FILE_COPYING_TEMP_SUFFIX));
-    }
-
-    KeyArgs.Builder newKeyArgs =
-        keyArgs.toBuilder().setModificationTime(Time.now())
-            .setKeyName(validateAndNormalizeKey(
-                ozoneManager.getEnableFileSystemPaths(), keyArgs.getKeyName()));
-
-    return getOmRequest().toBuilder()
-        .setCommitKeyRequest(commitKeyRequest.toBuilder()
-            .setKeyArgs(newKeyArgs)).setUserInfo(getUserInfo()).build();
-  }
-
-  @Override
   @SuppressWarnings("methodlength")
   public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager,
       long trxnLogIndex, OzoneManagerDoubleBufferHelper omDoubleBufferHelper) {
@@ -119,7 +89,7 @@ public class OMKeyCommitRequest extends OMKeyRequest {
     Map<String, String> auditMap = buildKeyArgsAuditMap(commitKeyArgs);
 
     OMResponse.Builder omResponse = OmResponseUtil.getOMResponseBuilder(
-        getOmRequest());
+            getOmRequest());
 
     IOException exception = null;
     OmKeyInfo omKeyInfo = null;
@@ -138,14 +108,13 @@ public class OMKeyCommitRequest extends OMKeyRequest {
 
       // check Acl
       checkKeyAclsInOpenKeyTable(ozoneManager, volumeName, bucketName,
-          keyName, IAccessAuthorizer.ACLType.WRITE,
-          commitKeyRequest.getClientID());
+              keyName, IAccessAuthorizer.ACLType.WRITE,
+              commitKeyRequest.getClientID());
 
-      String dbOzoneKey =
-          omMetadataManager.getOzoneKey(volumeName, bucketName,
-              keyName);
-      String dbOpenKey = omMetadataManager.getOpenKey(volumeName, bucketName,
-          keyName, commitKeyRequest.getClientID());
+
+      String bucketKey = omMetadataManager.getBucketKey(volumeName, bucketName);
+      Iterator<Path> pathComponents = Paths.get(keyName).iterator();
+      String dbOpenFileKey = null;
 
       List<OmKeyLocationInfo> locationInfoList = new ArrayList<>();
       for (KeyLocation keyLocation : commitKeyArgs.getKeyLocationsList()) {
@@ -153,25 +122,25 @@ public class OMKeyCommitRequest extends OMKeyRequest {
       }
 
       bucketLockAcquired =
-          omMetadataManager.getLock().acquireWriteLock(BUCKET_LOCK,
-              volumeName, bucketName);
+              omMetadataManager.getLock().acquireWriteLock(BUCKET_LOCK,
+                      volumeName, bucketName);
 
       validateBucketAndVolume(omMetadataManager, volumeName, bucketName);
 
-      // Check for directory exists with same name, if it exists throw error. 
-      if (ozoneManager.getEnableFileSystemPaths()) {
-        if (checkDirectoryAlreadyExists(volumeName, bucketName, keyName,
-            omMetadataManager)) {
-          throw new OMException("Can not create file: " + keyName +
-              " as there is already directory in the given path", NOT_A_FILE);
-        }
-      }
-
-
-      omKeyInfo = omMetadataManager.getOpenKeyTable().get(dbOpenKey);
+      String fileName = OzoneFSUtils.getFileName(keyName);
+      omBucketInfo = omMetadataManager.getBucketTable().get(bucketKey);
+      long bucketId = omBucketInfo.getObjectID();
+      long parentID = getParentID(bucketId, pathComponents, keyName,
+              omMetadataManager, ozoneManager);
+      String dbFileKey = omMetadataManager.getOzonePathKey(parentID, fileName);
+      dbOpenFileKey = omMetadataManager.getOpenFileName(parentID, fileName,
+              commitKeyRequest.getClientID());
+
+      omKeyInfo = OMFileRequest.getOmKeyInfoFromFileTable(true,
+              omMetadataManager, dbOpenFileKey, keyName);
       if (omKeyInfo == null) {
-        throw new OMException("Failed to commit key, as " + dbOpenKey +
-            "entry is not found in the OpenKey table", KEY_NOT_FOUND);
+        throw new OMException("Failed to commit key, as " + dbOpenFileKey +
+                "entry is not found in the OpenKey table", KEY_NOT_FOUND);
       }
       omKeyInfo.setDataSize(commitKeyArgs.getDataSize());
 
@@ -184,73 +153,120 @@ public class OMKeyCommitRequest extends OMKeyRequest {
       omKeyInfo.setUpdateID(trxnLogIndex, ozoneManager.isRatisEnabled());
 
       // Add to cache of open key table and key table.
-      omMetadataManager.getOpenKeyTable().addCacheEntry(
-          new CacheKey<>(dbOpenKey),
-          new CacheValue<>(Optional.absent(), trxnLogIndex));
+      OMFileRequest.addOpenFileTableCacheEntry(omMetadataManager, dbFileKey,
+              null, fileName, trxnLogIndex);
 
-      omMetadataManager.getKeyTable().addCacheEntry(
-          new CacheKey<>(dbOzoneKey),
-          new CacheValue<>(Optional.of(omKeyInfo), trxnLogIndex));
+      OMFileRequest.addFileTableCacheEntry(omMetadataManager, dbFileKey,
+              omKeyInfo, fileName, trxnLogIndex);
 
       long scmBlockSize = ozoneManager.getScmBlockSize();
       int factor = omKeyInfo.getFactor().getNumber();
       omVolumeArgs = getVolumeInfo(omMetadataManager, volumeName);
-      omBucketInfo = getBucketInfo(omMetadataManager, volumeName, bucketName);
       // update usedBytes atomically.
       // Block was pre-requested and UsedBytes updated when createKey and
       // AllocatedBlock. The space occupied by the Key shall be based on
       // the actual Key size, and the total Block size applied before should
       // be subtracted.
       long correctedSpace = omKeyInfo.getDataSize() * factor -
-          locationInfoList.size() * scmBlockSize * factor;
+              locationInfoList.size() * scmBlockSize * factor;
       omVolumeArgs.getUsedBytes().add(correctedSpace);
       omBucketInfo.getUsedBytes().add(correctedSpace);
 
-      omClientResponse = new OMKeyCommitResponse(omResponse.build(),
-          omKeyInfo, dbOzoneKey, dbOpenKey, omVolumeArgs, omBucketInfo);
+      omClientResponse = new OMKeyCommitResponseV1(omResponse.build(),
+              omKeyInfo, dbFileKey, dbOpenFileKey, omVolumeArgs, omBucketInfo);
 
       result = Result.SUCCESS;
     } catch (IOException ex) {
       result = Result.FAILURE;
       exception = ex;
-      omClientResponse = new OMKeyCommitResponse(createErrorOMResponse(
-          omResponse, exception));
+      omClientResponse = new OMKeyCommitResponseV1(createErrorOMResponse(
+              omResponse, exception));
     } finally {
       addResponseToDoubleBuffer(trxnLogIndex, omClientResponse,
-          omDoubleBufferHelper);
+              omDoubleBufferHelper);
 
       if(bucketLockAcquired) {
         omMetadataManager.getLock().releaseWriteLock(BUCKET_LOCK, volumeName,
-            bucketName);
+                bucketName);
       }
     }
 
     auditLog(auditLogger, buildAuditMessage(OMAction.COMMIT_KEY, auditMap,
-          exception, getOmRequest().getUserInfo()));
-
-    switch (result) {
-    case SUCCESS:
-      // As when we commit the key, then it is visible in ozone, so we should
-      // increment here.
-      // As key also can have multiple versions, we need to increment keys
-      // only if version is 0. Currently we have not complete support of
-      // versioning of keys. So, this can be revisited later.
-      if (omKeyInfo.getKeyLocationVersions().size() == 1) {
-        omMetrics.incNumKeys();
+            exception, getOmRequest().getUserInfo()));
+
+    processResult(commitKeyRequest, volumeName, bucketName, keyName, omMetrics,
+            exception, omKeyInfo, result);
+
+    return omClientResponse;
+  }
+
+
+  /**
+   * Check for directory exists with same name, if it exists throw error.
+   *
+   * @param keyName                  key name
+   * @param ozoneManager             Ozone Manager
+   * @param reachedLastPathComponent true if the path component is a fileName
+   * @throws IOException if directory exists with same name
+   */
+  private void checkDirectoryAlreadyExists(String keyName,
+                                           OzoneManager ozoneManager,
+                                           boolean reachedLastPathComponent)
+          throws IOException {
+    // Reached last component, which would be a file. Returns its parentID.
+    if (reachedLastPathComponent && ozoneManager.getEnableFileSystemPaths()) {
+      throw new OMException("Can not create file: " + keyName +
+              " as there is already directory in the given path", NOT_A_FILE);
+    }
+  }
+
+  /**
+   * Get parent id for the user given path.
+   *
+   * @param bucketId          bucket id
+   * @param pathComponents    fie path elements
+   * @param keyName           user given key name
+   * @param omMetadataManager metadata manager
+   * @return lastKnownParentID
+   * @throws IOException DB failure or parent not exists in DirectoryTable
+   */
+  private long getParentID(long bucketId, Iterator<Path> pathComponents,
+                           String keyName, OMMetadataManager omMetadataManager,
+                           OzoneManager ozoneManager)
+          throws IOException {
+
+    long lastKnownParentId = bucketId;
+
+    // If no sub-dirs then bucketID is the root/parent.
+    if(!pathComponents.hasNext()){
+      return bucketId;
+    }
+
+    OmDirectoryInfo omDirectoryInfo;
+    while (pathComponents.hasNext()) {
+      String nodeName = pathComponents.next().toString();
+      boolean reachedLastPathComponent = !pathComponents.hasNext();
+      String dbNodeName =
+              omMetadataManager.getOzonePathKey(lastKnownParentId, nodeName);
+
+      omDirectoryInfo = omMetadataManager.
+              getDirectoryTable().get(dbNodeName);
+      if (omDirectoryInfo != null) {
+        checkDirectoryAlreadyExists(keyName, ozoneManager,
+                reachedLastPathComponent);
+        lastKnownParentId = omDirectoryInfo.getObjectID();
+      } else {
+        // One of the sub-dir doesn't exists in DB. Immediate parent should
+        // exists for committing the key, otherwise will fail the operation.
+        if (!reachedLastPathComponent) {
+          throw new OMException("Failed to commit key, as parent directory of "
+                  + keyName + " entry is not found in DirectoryTable",
+                  KEY_NOT_FOUND);
+        }
+        break;
       }
-      LOG.debug("Key committed. Volume:{}, Bucket:{}, Key:{}", volumeName,
-          bucketName, keyName);
-      break;
-    case FAILURE:
-      LOG.error("Key commit failed. Volume:{}, Bucket:{}, Key:{}.",
-          volumeName, bucketName, keyName, exception);
-      omMetrics.incNumKeyCommitFails();
-      break;
-    default:
-      LOG.error("Unrecognized Result for OMKeyCommitRequest: {}",
-          commitKeyRequest);
     }
 
-    return omClientResponse;
+    return lastKnownParentId;
   }
 }
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyRequest.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyRequest.java
index f1e2bfc..9344403 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyRequest.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyRequest.java
@@ -46,6 +46,7 @@ import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfoGroup;
 import org.apache.hadoop.ozone.om.helpers.OmPrefixInfo;
 import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
 import org.apache.hadoop.ozone.om.helpers.OzoneAclUtil;
+import org.apache.hadoop.ozone.om.request.file.OMFileRequest;
 import org.apache.hadoop.ozone.protocolPB.OMPBHelper;
 import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer;
 import org.apache.hadoop.ozone.security.acl.OzoneObj;
@@ -250,38 +251,6 @@ public abstract class OMKeyRequest extends OMClientRequest {
     return edek;
   }
 
-  /**
-   * Create OmKeyInfo object.
-   * @return OmKeyInfo
-   */
-  @SuppressWarnings("parameterNumber")
-  protected OmKeyInfo createKeyInfo(@Nonnull KeyArgs keyArgs,
-      @Nonnull List<OmKeyLocationInfo> locations,
-      @Nonnull HddsProtos.ReplicationFactor factor,
-      @Nonnull HddsProtos.ReplicationType type, long size,
-      @Nullable FileEncryptionInfo encInfo,
-      @Nonnull PrefixManager prefixManager,
-      @Nullable OmBucketInfo omBucketInfo,
-      long transactionLogIndex, long objectID) {
-    return new OmKeyInfo.Builder()
-        .setVolumeName(keyArgs.getVolumeName())
-        .setBucketName(keyArgs.getBucketName())
-        .setKeyName(keyArgs.getKeyName())
-        .setOmKeyLocationInfos(Collections.singletonList(
-            new OmKeyLocationInfoGroup(0, locations)))
-        .setCreationTime(keyArgs.getModificationTime())
-        .setModificationTime(keyArgs.getModificationTime())
-        .setDataSize(size)
-        .setReplicationType(type)
-        .setReplicationFactor(factor)
-        .setFileEncryptionInfo(encInfo)
-        .setAcls(getAclsForKey(keyArgs, omBucketInfo, prefixManager))
-        .addAllMetadata(KeyValueUtil.getFromProtobuf(keyArgs.getMetadataList()))
-        .setObjectID(objectID)
-        .setUpdateID(transactionLogIndex)
-        .build();
-  }
-
   private List< OzoneAcl > getAclsForKey(KeyArgs keyArgs,
       OmBucketInfo bucketInfo, PrefixManager prefixManager) {
     List<OzoneAcl> acls = new ArrayList<>();
@@ -321,96 +290,6 @@ public abstract class OMKeyRequest extends OMClientRequest {
   }
 
   /**
-   * Prepare OmKeyInfo which will be persisted to openKeyTable.
-   * @return OmKeyInfo
-   * @throws IOException
-   */
-  @SuppressWarnings("parameternumber")
-  protected OmKeyInfo prepareKeyInfo(
-      @Nonnull OMMetadataManager omMetadataManager,
-      @Nonnull KeyArgs keyArgs, OmKeyInfo dbKeyInfo, long size,
-      @Nonnull List<OmKeyLocationInfo> locations,
-      @Nullable FileEncryptionInfo encInfo,
-      @Nonnull PrefixManager prefixManager,
-      @Nullable OmBucketInfo omBucketInfo,
-      long transactionLogIndex,
-      @Nonnull long objectID,
-      boolean isRatisEnabled)
-      throws IOException {
-    if (keyArgs.getIsMultipartKey()) {
-      return prepareMultipartKeyInfo(omMetadataManager, keyArgs,
-          size, locations, encInfo, prefixManager, omBucketInfo,
-          transactionLogIndex, objectID);
-      //TODO args.getMetadata
-    }
-    if (dbKeyInfo != null) {
-      // TODO: Need to be fixed, as when key already exists, we are
-      //  appending new blocks to existing key.
-      // The key already exist, the new blocks will be added as new version
-      // when locations.size = 0, the new version will have identical blocks
-      // as its previous version
-      dbKeyInfo.addNewVersion(locations, false);
-      dbKeyInfo.setDataSize(size + dbKeyInfo.getDataSize());
-      // The modification time is set in preExecute. Use the same
-      // modification time.
-      dbKeyInfo.setModificationTime(keyArgs.getModificationTime());
-      dbKeyInfo.setUpdateID(transactionLogIndex, isRatisEnabled);
-      return dbKeyInfo;
-    }
-
-    // the key does not exist, create a new object.
-    // Blocks will be appended as version 0.
-    return createKeyInfo(keyArgs, locations, keyArgs.getFactor(),
-        keyArgs.getType(), keyArgs.getDataSize(), encInfo, prefixManager,
-        omBucketInfo, transactionLogIndex, objectID);
-  }
-
-  /**
-   * Prepare OmKeyInfo for multi-part upload part key which will be persisted
-   * to openKeyTable.
-   * @return OmKeyInfo
-   * @throws IOException
-   */
-  @SuppressWarnings("parameternumber")
-  private OmKeyInfo prepareMultipartKeyInfo(
-      @Nonnull OMMetadataManager omMetadataManager,
-      @Nonnull KeyArgs args, long size,
-      @Nonnull List<OmKeyLocationInfo> locations,
-      FileEncryptionInfo encInfo,  @Nonnull PrefixManager prefixManager,
-      @Nullable OmBucketInfo omBucketInfo, @Nonnull long transactionLogIndex,
-      @Nonnull long objectId)
-      throws IOException {
-    HddsProtos.ReplicationFactor factor;
-    HddsProtos.ReplicationType type;
-
-    Preconditions.checkArgument(args.getMultipartNumber() > 0,
-        "PartNumber Should be greater than zero");
-    // When key is multipart upload part key, we should take replication
-    // type and replication factor from original key which has done
-    // initiate multipart upload. If we have not found any such, we throw
-    // error no such multipart upload.
-    String uploadID = args.getMultipartUploadID();
-    Preconditions.checkNotNull(uploadID);
-    String multipartKey = omMetadataManager
-        .getMultipartKey(args.getVolumeName(), args.getBucketName(),
-            args.getKeyName(), uploadID);
-    OmKeyInfo partKeyInfo = omMetadataManager.getOpenKeyTable().get(
-        multipartKey);
-    if (partKeyInfo == null) {
-      throw new OMException("No such Multipart upload is with specified " +
-          "uploadId " + uploadID,
-          OMException.ResultCodes.NO_SUCH_MULTIPART_UPLOAD_ERROR);
-    } else {
-      factor = partKeyInfo.getFactor();
-      type = partKeyInfo.getType();
-    }
-    // For this upload part we don't need to check in KeyTable. As this
-    // is not an actual key, it is a part of the key.
-    return createKeyInfo(args, locations, factor, type, size, encInfo,
-        prefixManager, omBucketInfo, transactionLogIndex, objectId);
-  }
-
-  /**
    * Check Acls for the ozone bucket.
    * @param ozoneManager
    * @param volume
@@ -428,7 +307,6 @@ public abstract class OMKeyRequest extends OMClientRequest {
     }
   }
 
-
   /**
    * Check Acls for the ozone key.
    * @param ozoneManager
@@ -684,4 +562,158 @@ public abstract class OMKeyRequest extends OMClientRequest {
         new CacheKey<>(omMetadataManager.getBucketKey(volume, bucket)))
         .getCacheValue();
   }
+
+  /**
+   * Prepare OmKeyInfo which will be persisted to openKeyTable.
+   * @return OmKeyInfo
+   * @throws IOException
+   */
+  @SuppressWarnings("parameternumber")
+  protected OmKeyInfo prepareKeyInfo(
+          @Nonnull OMMetadataManager omMetadataManager,
+          @Nonnull KeyArgs keyArgs, OmKeyInfo dbKeyInfo, long size,
+          @Nonnull List<OmKeyLocationInfo> locations,
+          @Nullable FileEncryptionInfo encInfo,
+          @Nonnull PrefixManager prefixManager,
+          @Nullable OmBucketInfo omBucketInfo,
+          long transactionLogIndex, long objectID, boolean isRatisEnabled)
+          throws IOException {
+
+    return prepareFileInfo(omMetadataManager, keyArgs, dbKeyInfo, size,
+            locations, encInfo, prefixManager, omBucketInfo, null,
+            transactionLogIndex, objectID, isRatisEnabled);
+  }
+
+  /**
+   * Prepare OmKeyInfo which will be persisted to openKeyTable.
+   * @return OmKeyInfo
+   * @throws IOException
+   */
+  @SuppressWarnings("parameternumber")
+  protected OmKeyInfo prepareFileInfo(
+          @Nonnull OMMetadataManager omMetadataManager,
+          @Nonnull KeyArgs keyArgs, OmKeyInfo dbKeyInfo, long size,
+          @Nonnull List<OmKeyLocationInfo> locations,
+          @Nullable FileEncryptionInfo encInfo,
+          @Nonnull PrefixManager prefixManager,
+          @Nullable OmBucketInfo omBucketInfo,
+          OMFileRequest.OMPathInfoV1 omPathInfo,
+          long transactionLogIndex, long objectID,
+          boolean isRatisEnabled)
+          throws IOException {
+    if (keyArgs.getIsMultipartKey()) {
+      return prepareMultipartFileInfo(omMetadataManager, keyArgs,
+              size, locations, encInfo, prefixManager, omBucketInfo,
+              omPathInfo, transactionLogIndex, objectID);
+      //TODO args.getMetadata
+    }
+    if (dbKeyInfo != null) {
+      // TODO: Need to be fixed, as when key already exists, we are
+      //  appending new blocks to existing key.
+      // The key already exist, the new blocks will be added as new version
+      // when locations.size = 0, the new version will have identical blocks
+      // as its previous version
+      dbKeyInfo.addNewVersion(locations, false);
+      dbKeyInfo.setDataSize(size + dbKeyInfo.getDataSize());
+      // The modification time is set in preExecute. Use the same
+      // modification time.
+      dbKeyInfo.setModificationTime(keyArgs.getModificationTime());
+      dbKeyInfo.setUpdateID(transactionLogIndex, isRatisEnabled);
+      return dbKeyInfo;
+    }
+
+    // the key does not exist, create a new object.
+    // Blocks will be appended as version 0.
+    return createFileInfo(keyArgs, locations, keyArgs.getFactor(),
+            keyArgs.getType(), keyArgs.getDataSize(), encInfo, prefixManager,
+            omBucketInfo, omPathInfo, transactionLogIndex, objectID);
+  }
+
+  /**
+   * Create OmKeyInfo object.
+   * @return OmKeyInfo
+   */
+  @SuppressWarnings("parameterNumber")
+  protected OmKeyInfo createFileInfo(@Nonnull KeyArgs keyArgs,
+      @Nonnull List<OmKeyLocationInfo> locations,
+      @Nonnull HddsProtos.ReplicationFactor factor,
+      @Nonnull HddsProtos.ReplicationType type, long size,
+      @Nullable FileEncryptionInfo encInfo,
+      @Nonnull PrefixManager prefixManager,
+      @Nullable OmBucketInfo omBucketInfo,
+      OMFileRequest.OMPathInfoV1 omPathInfo,
+      long transactionLogIndex, long objectID) {
+
+    OmKeyInfo.Builder builder = new OmKeyInfo.Builder();
+    builder.setVolumeName(keyArgs.getVolumeName())
+            .setBucketName(keyArgs.getBucketName())
+            .setKeyName(keyArgs.getKeyName())
+            .setOmKeyLocationInfos(Collections.singletonList(
+                    new OmKeyLocationInfoGroup(0, locations)))
+            .setCreationTime(keyArgs.getModificationTime())
+            .setModificationTime(keyArgs.getModificationTime())
+            .setDataSize(size)
+            .setReplicationType(type)
+            .setReplicationFactor(factor)
+            .setFileEncryptionInfo(encInfo)
+            .setAcls(getAclsForKey(keyArgs, omBucketInfo, prefixManager))
+            .addAllMetadata(KeyValueUtil.getFromProtobuf(
+                    keyArgs.getMetadataList()))
+            .setUpdateID(transactionLogIndex);
+    if (omPathInfo != null) {
+      // FileTable metadata format
+      objectID = omPathInfo.getLeafNodeObjectId();
+      builder.setParentObjectID(omPathInfo.getLastKnownParentId());
+      builder.setFileName(omPathInfo.getLeafNodeName());
+    }
+    builder.setObjectID(objectID);
+    return builder.build();
+  }
+
+  /**
+   * Prepare OmKeyInfo for multi-part upload part key which will be persisted
+   * to openKeyTable.
+   * @return OmKeyInfo
+   * @throws IOException
+   */
+  @SuppressWarnings("parameternumber")
+  private OmKeyInfo prepareMultipartFileInfo(
+          @Nonnull OMMetadataManager omMetadataManager,
+          @Nonnull KeyArgs args, long size,
+          @Nonnull List<OmKeyLocationInfo> locations,
+          FileEncryptionInfo encInfo,  @Nonnull PrefixManager prefixManager,
+          @Nullable OmBucketInfo omBucketInfo,
+          OMFileRequest.OMPathInfoV1 omPathInfo,
+          @Nonnull long transactionLogIndex, long objectID)
+          throws IOException {
+    HddsProtos.ReplicationFactor factor;
+    HddsProtos.ReplicationType type;
+
+    Preconditions.checkArgument(args.getMultipartNumber() > 0,
+            "PartNumber Should be greater than zero");
+    // When key is multipart upload part key, we should take replication
+    // type and replication factor from original key which has done
+    // initiate multipart upload. If we have not found any such, we throw
+    // error no such multipart upload.
+    String uploadID = args.getMultipartUploadID();
+    Preconditions.checkNotNull(uploadID);
+    String multipartKey = omMetadataManager
+            .getMultipartKey(args.getVolumeName(), args.getBucketName(),
+                    args.getKeyName(), uploadID);
+    OmKeyInfo partKeyInfo = omMetadataManager.getOpenKeyTable().get(
+            multipartKey);
+    if (partKeyInfo == null) {
+      throw new OMException("No such Multipart upload is with specified " +
+              "uploadId " + uploadID,
+              OMException.ResultCodes.NO_SUCH_MULTIPART_UPLOAD_ERROR);
+    } else {
+      factor = partKeyInfo.getFactor();
+      type = partKeyInfo.getType();
+    }
+    // For this upload part we don't need to check in KeyTable. As this
+    // is not an actual key, it is a part of the key.
+    return createFileInfo(args, locations, factor, type, size, encInfo,
+            prefixManager, omBucketInfo, omPathInfo, transactionLogIndex,
+            objectID);
+  }
 }
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/file/OMFileCreateResponseV1.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/file/OMFileCreateResponseV1.java
new file mode 100644
index 0000000..a168d8f
--- /dev/null
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/file/OMFileCreateResponseV1.java
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.om.response.file;
+
+import org.apache.hadoop.hdds.utils.db.BatchOperation;
+import org.apache.hadoop.ozone.om.OMMetadataManager;
+import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
+import org.apache.hadoop.ozone.om.helpers.OmDirectoryInfo;
+import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
+import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
+import org.apache.hadoop.ozone.om.request.file.OMFileRequest;
+import org.apache.hadoop.ozone.om.response.CleanupTableInfo;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse;
+
+import javax.annotation.Nonnull;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.hadoop.ozone.om.OmMetadataManagerImpl.OPEN_FILE_TABLE;
+
+/**
+ * Response for create file request layout version V1.
+ */
+@CleanupTableInfo(cleanupTables = OPEN_FILE_TABLE)
+public class OMFileCreateResponseV1 extends OMFileCreateResponse {
+
+  private List<OmDirectoryInfo> parentDirInfos;
+
+  public OMFileCreateResponseV1(@Nonnull OMResponse omResponse,
+                                @Nonnull OmKeyInfo omKeyInfo,
+                                @Nonnull List<OmDirectoryInfo> parentDirInfos,
+                                long openKeySessionID,
+                                @Nonnull OmVolumeArgs omVolumeArgs,
+                                @Nonnull OmBucketInfo omBucketInfo) {
+    super(omResponse, omKeyInfo, new ArrayList<>(), openKeySessionID,
+        omVolumeArgs, omBucketInfo);
+    this.parentDirInfos = parentDirInfos;
+  }
+
+  @Override
+  public void addToDBBatch(OMMetadataManager omMetadataMgr,
+                              BatchOperation batchOp) throws IOException {
+
+    /**
+     * Create parent directory entries during Key Create - do not wait
+     * for Key Commit request.
+     * XXX handle stale directory entries.
+     */
+    if (parentDirInfos != null) {
+      for (OmDirectoryInfo parentDirInfo : parentDirInfos) {
+        String parentKey = parentDirInfo.getPath();
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("putWithBatch adding parent : key {} info : {}", parentKey,
+                  parentDirInfo);
+        }
+        omMetadataMgr.getDirectoryTable().putWithBatch(batchOp, parentKey,
+                parentDirInfo);
+      }
+    }
+
+    OMFileRequest.addToOpenFileTable(omMetadataMgr, batchOp, getOmKeyInfo(),
+            getOpenKeySessionID());
+
+    // update volume usedBytes.
+    omMetadataMgr.getVolumeTable().putWithBatch(batchOp,
+            omMetadataMgr.getVolumeKey(getOmVolumeArgs().getVolume()),
+            getOmVolumeArgs());
+    // update bucket usedBytes.
+    omMetadataMgr.getBucketTable().putWithBatch(batchOp,
+            omMetadataMgr.getBucketKey(getOmVolumeArgs().getVolume(),
+                    getOmBucketInfo().getBucketName()), getOmBucketInfo());
+  }
+
+}
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/OMKeyCommitResponse.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/OMKeyCommitResponse.java
index aede2ec..6661900 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/OMKeyCommitResponse.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/OMKeyCommitResponse.java
@@ -86,4 +86,23 @@ public class OMKeyCommitResponse extends OMClientResponse {
             omBucketInfo.getBucketName()), omBucketInfo);
   }
 
+  protected String getOpenKeyName() {
+    return openKeyName;
+  }
+
+  protected OmKeyInfo getOmKeyInfo() {
+    return omKeyInfo;
+  }
+
+  protected OmVolumeArgs getOmVolumeArgs() {
+    return omVolumeArgs;
+  }
+
+  protected OmBucketInfo getOmBucketInfo() {
+    return omBucketInfo;
+  }
+
+  protected String getOzoneKeyName() {
+    return ozoneKeyName;
+  }
 }
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/OMKeyCommitResponse.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/OMKeyCommitResponseV1.java
similarity index 59%
copy from hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/OMKeyCommitResponse.java
copy to hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/OMKeyCommitResponseV1.java
index aede2ec..bff5554 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/OMKeyCommitResponse.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/OMKeyCommitResponseV1.java
@@ -18,72 +18,64 @@
 
 package org.apache.hadoop.ozone.om.response.key;
 
+import org.apache.hadoop.hdds.utils.db.BatchOperation;
 import org.apache.hadoop.ozone.om.OMMetadataManager;
 import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
 import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
 import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
+import org.apache.hadoop.ozone.om.request.file.OMFileRequest;
 import org.apache.hadoop.ozone.om.response.CleanupTableInfo;
-import org.apache.hadoop.ozone.om.response.OMClientResponse;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse;
-import org.apache.hadoop.hdds.utils.db.BatchOperation;
 
-import java.io.IOException;
 import javax.annotation.Nonnull;
 
-import static org.apache.hadoop.ozone.om.OmMetadataManagerImpl.KEY_TABLE;
-import static org.apache.hadoop.ozone.om.OmMetadataManagerImpl.OPEN_KEY_TABLE;
+import java.io.IOException;
+
+import static org.apache.hadoop.ozone.om.OmMetadataManagerImpl.OPEN_FILE_TABLE;
+import static org.apache.hadoop.ozone.om.OmMetadataManagerImpl.FILE_TABLE;
 
 /**
- * Response for CommitKey request.
+ * Response for CommitKey request layout version V1.
  */
-@CleanupTableInfo(cleanupTables = {OPEN_KEY_TABLE, KEY_TABLE})
-public class OMKeyCommitResponse extends OMClientResponse {
-
-  private OmKeyInfo omKeyInfo;
-  private String ozoneKeyName;
-  private String openKeyName;
-  private OmVolumeArgs omVolumeArgs;
-  private OmBucketInfo omBucketInfo;
+@CleanupTableInfo(cleanupTables = {OPEN_FILE_TABLE, FILE_TABLE})
+public class OMKeyCommitResponseV1 extends OMKeyCommitResponse {
 
-  public OMKeyCommitResponse(@Nonnull OMResponse omResponse,
-      @Nonnull OmKeyInfo omKeyInfo, String ozoneKeyName, String openKeyName,
-      @Nonnull OmVolumeArgs omVolumeArgs, @Nonnull OmBucketInfo omBucketInfo) {
-    super(omResponse);
-    this.omKeyInfo = omKeyInfo;
-    this.ozoneKeyName = ozoneKeyName;
-    this.openKeyName = openKeyName;
-    this.omVolumeArgs = omVolumeArgs;
-    this.omBucketInfo = omBucketInfo;
+  public OMKeyCommitResponseV1(@Nonnull OMResponse omResponse,
+                               @Nonnull OmKeyInfo omKeyInfo,
+                               String ozoneKeyName, String openKeyName,
+                               @Nonnull OmVolumeArgs omVolumeArgs,
+                               @Nonnull OmBucketInfo omBucketInfo) {
+    super(omResponse, omKeyInfo, ozoneKeyName, openKeyName, omVolumeArgs,
+            omBucketInfo);
   }
 
   /**
    * For when the request is not successful.
    * For a successful request, the other constructor should be used.
    */
-  public OMKeyCommitResponse(@Nonnull OMResponse omResponse) {
+  public OMKeyCommitResponseV1(@Nonnull OMResponse omResponse) {
     super(omResponse);
     checkStatusNotOK();
   }
 
   @Override
   public void addToDBBatch(OMMetadataManager omMetadataManager,
-      BatchOperation batchOperation) throws IOException {
+                           BatchOperation batchOperation) throws IOException {
 
     // Delete from OpenKey table
     omMetadataManager.getOpenKeyTable().deleteWithBatch(batchOperation,
-        openKeyName);
+            getOpenKeyName());
 
-    omMetadataManager.getKeyTable().putWithBatch(batchOperation, ozoneKeyName,
-        omKeyInfo);
+    OMFileRequest.addToFileTable(omMetadataManager, batchOperation,
+            getOmKeyInfo());
 
     // update volume usedBytes.
     omMetadataManager.getVolumeTable().putWithBatch(batchOperation,
-        omMetadataManager.getVolumeKey(omVolumeArgs.getVolume()),
-        omVolumeArgs);
+            omMetadataManager.getVolumeKey(getOmVolumeArgs().getVolume()),
+            getOmVolumeArgs());
     // update bucket usedBytes.
     omMetadataManager.getBucketTable().putWithBatch(batchOperation,
-        omMetadataManager.getBucketKey(omVolumeArgs.getVolume(),
-            omBucketInfo.getBucketName()), omBucketInfo);
+            omMetadataManager.getBucketKey(getOmVolumeArgs().getVolume(),
+                    getOmBucketInfo().getBucketName()), getOmBucketInfo());
   }
-
 }
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/OMKeyCreateResponse.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/OMKeyCreateResponse.java
index 86224a1..eca2578 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/OMKeyCreateResponse.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/OMKeyCreateResponse.java
@@ -73,7 +73,7 @@ public class OMKeyCreateResponse extends OMClientResponse {
   }
 
   @Override
-  protected void addToDBBatch(OMMetadataManager omMetadataManager,
+  public void addToDBBatch(OMMetadataManager omMetadataManager,
       BatchOperation batchOperation) throws IOException {
 
     /**
@@ -109,5 +109,21 @@ public class OMKeyCreateResponse extends OMClientResponse {
         omMetadataManager.getBucketKey(omVolumeArgs.getVolume(),
             omBucketInfo.getBucketName()), omBucketInfo);
   }
+
+  protected long getOpenKeySessionID() {
+    return openKeySessionID;
+  }
+
+  protected OmKeyInfo getOmKeyInfo() {
+    return omKeyInfo;
+  }
+
+  protected OmVolumeArgs getOmVolumeArgs() {
+    return omVolumeArgs;
+  }
+
+  protected OmBucketInfo getOmBucketInfo() {
+    return omBucketInfo;
+  }
 }
 
diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/TestOMRequestUtils.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/TestOMRequestUtils.java
index b32a116..776abf1 100644
--- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/TestOMRequestUtils.java
+++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/TestOMRequestUtils.java
@@ -68,6 +68,7 @@ import org.apache.hadoop.ozone.security.acl.OzoneObj.StoreType;
 
 import org.apache.hadoop.ozone.security.acl.OzoneObjInfo;
 import org.apache.hadoop.ozone.storage.proto.OzoneManagerStorageProtos;
+import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.Time;
 import org.apache.hadoop.hdds.utils.db.cache.CacheKey;
 import org.apache.hadoop.hdds.utils.db.cache.CacheValue;
@@ -812,4 +813,113 @@ public final class TestOMRequestUtils {
         new CacheKey<>(dbVolumeKey),
         new CacheValue<>(Optional.of(omVolumeArgs), 1L));
   }
+
+  /**
+   * Create OmKeyInfo.
+   */
+  @SuppressWarnings("parameterNumber")
+  public static OmKeyInfo createOmKeyInfo(String volumeName, String bucketName,
+      String keyName, HddsProtos.ReplicationType replicationType,
+      HddsProtos.ReplicationFactor replicationFactor, long objectID,
+      long parentID, long trxnLogIndex, long creationTime) {
+    String fileName = OzoneFSUtils.getFileName(keyName);
+    return new OmKeyInfo.Builder()
+            .setVolumeName(volumeName)
+            .setBucketName(bucketName)
+            .setKeyName(keyName)
+            .setOmKeyLocationInfos(Collections.singletonList(
+                    new OmKeyLocationInfoGroup(0, new ArrayList<>())))
+            .setCreationTime(creationTime)
+            .setModificationTime(Time.now())
+            .setDataSize(1000L)
+            .setReplicationType(replicationType)
+            .setReplicationFactor(replicationFactor)
+            .setObjectID(objectID)
+            .setUpdateID(trxnLogIndex)
+            .setParentObjectID(parentID)
+            .setFileName(fileName)
+            .build();
+  }
+
+
+  /**
+   * Add key entry to KeyTable. if openKeyTable flag is true, add's entries
+   * to openKeyTable, else add's it to keyTable.
+   *
+   * @throws Exception DB failure
+   */
+  public static void addFileToKeyTable(boolean openKeyTable,
+                                       boolean addToCache, String fileName,
+                                       OmKeyInfo omKeyInfo,
+                                       long clientID, long trxnLogIndex,
+                                       OMMetadataManager omMetadataManager)
+          throws Exception {
+    if (openKeyTable) {
+      String ozoneKey = omMetadataManager.getOpenFileName(
+              omKeyInfo.getParentObjectID(), fileName, clientID);
+      if (addToCache) {
+        omMetadataManager.getOpenKeyTable().addCacheEntry(
+                new CacheKey<>(ozoneKey),
+                new CacheValue<>(Optional.of(omKeyInfo), trxnLogIndex));
+      }
+      omMetadataManager.getOpenKeyTable().put(ozoneKey, omKeyInfo);
+    } else {
+      String ozoneKey = omMetadataManager.getOzonePathKey(
+              omKeyInfo.getParentObjectID(), fileName);
+      if (addToCache) {
+        omMetadataManager.getKeyTable().addCacheEntry(new CacheKey<>(ozoneKey),
+                new CacheValue<>(Optional.of(omKeyInfo), trxnLogIndex));
+      }
+      omMetadataManager.getKeyTable().put(ozoneKey, omKeyInfo);
+    }
+  }
+
+  /**
+   * Gets bucketId from OM metadata manager.
+   *
+   * @param volumeName        volume name
+   * @param bucketName        bucket name
+   * @param omMetadataManager metadata manager
+   * @return bucket Id
+   * @throws Exception DB failure
+   */
+  public static long getBucketId(String volumeName, String bucketName,
+                                 OMMetadataManager omMetadataManager)
+          throws Exception {
+    String bucketKey = omMetadataManager.getBucketKey(volumeName, bucketName);
+    OmBucketInfo omBucketInfo =
+            omMetadataManager.getBucketTable().get(bucketKey);
+    return omBucketInfo.getObjectID();
+  }
+
+  /**
+   * Add path components to the directory table and returns last directory's
+   * object id.
+   *
+   * @param volumeName volume name
+   * @param bucketName bucket name
+   * @param key        key name
+   * @param omMetaMgr  metdata manager
+   * @return last directory object id
+   * @throws Exception
+   */
+  public static long addParentsToDirTable(String volumeName, String bucketName,
+                                    String key, OMMetadataManager omMetaMgr)
+          throws Exception {
+    long bucketId = TestOMRequestUtils.getBucketId(volumeName, bucketName,
+            omMetaMgr);
+    String[] pathComponents = StringUtils.split(key, '/');
+    long objectId = bucketId + 10;
+    long parentId = bucketId;
+    long txnID = 50;
+    for (String pathElement : pathComponents) {
+      OmDirectoryInfo omDirInfo =
+              TestOMRequestUtils.createOmDirectoryInfo(pathElement, ++objectId,
+                      parentId);
+      TestOMRequestUtils.addDirKeyToDirTable(true, omDirInfo,
+              txnID, omMetaMgr);
+      parentId = omDirInfo.getObjectID();
+    }
+    return parentId;
+  }
 }
diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/file/TestOMDirectoryCreateRequestV1.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/file/TestOMDirectoryCreateRequestV1.java
index 77cf74b..f0f0320 100644
--- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/file/TestOMDirectoryCreateRequestV1.java
+++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/file/TestOMDirectoryCreateRequestV1.java
@@ -88,6 +88,7 @@ public class TestOMDirectoryCreateRequestV1 {
     OzoneConfiguration ozoneConfiguration = new OzoneConfiguration();
     ozoneConfiguration.set(OMConfigKeys.OZONE_OM_DB_DIRS,
             folder.newFolder().getAbsolutePath());
+    ozoneConfiguration.set(OMConfigKeys.OZONE_OM_LAYOUT_VERSION, "V1");
     omMetadataManager = new OmMetadataManagerImpl(ozoneConfiguration);
     when(ozoneManager.getMetrics()).thenReturn(omMetrics);
     when(ozoneManager.getMetadataManager()).thenReturn(omMetadataManager);
diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/file/TestOMFileCreateRequest.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/file/TestOMFileCreateRequest.java
index c7aa6be..5010d0a 100644
--- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/file/TestOMFileCreateRequest.java
+++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/file/TestOMFileCreateRequest.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.ozone.om.request.file;
 import java.util.List;
 import java.util.UUID;
 
+import org.jetbrains.annotations.NotNull;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -55,8 +56,7 @@ public class TestOMFileCreateRequest extends TestOMKeyRequest {
         HddsProtos.ReplicationFactor.ONE, HddsProtos.ReplicationType.RATIS,
         false, false);
 
-    OMFileCreateRequest omFileCreateRequest =
-        new OMFileCreateRequest(omRequest);
+    OMFileCreateRequest omFileCreateRequest = getOMFileCreateRequest(omRequest);
 
     OMRequest modifiedOmRequest = omFileCreateRequest.preExecute(ozoneManager);
     Assert.assertNotEquals(omRequest, modifiedOmRequest);
@@ -96,8 +96,7 @@ public class TestOMFileCreateRequest extends TestOMKeyRequest {
         HddsProtos.ReplicationFactor.ONE, HddsProtos.ReplicationType.RATIS,
         false, false);
 
-    OMFileCreateRequest omFileCreateRequest = new OMFileCreateRequest(
-        omRequest);
+    OMFileCreateRequest omFileCreateRequest = getOMFileCreateRequest(omRequest);
 
     OMRequest modifiedOmRequest = omFileCreateRequest.preExecute(ozoneManager);
     Assert.assertNotEquals(omRequest, modifiedOmRequest);
@@ -121,21 +120,17 @@ public class TestOMFileCreateRequest extends TestOMKeyRequest {
 
     TestOMRequestUtils.addVolumeAndBucketToDB(volumeName, bucketName,
         omMetadataManager);
-    OMFileCreateRequest omFileCreateRequest = new OMFileCreateRequest(
-        omRequest);
+    OMFileCreateRequest omFileCreateRequest = getOMFileCreateRequest(omRequest);
 
     OMRequest modifiedOmRequest = omFileCreateRequest.preExecute(ozoneManager);
 
     long id = modifiedOmRequest.getCreateFileRequest().getClientID();
 
-    String openKey = omMetadataManager.getOpenKey(volumeName, bucketName,
-        keyName, id);
-
     // Before calling
-    OmKeyInfo omKeyInfo = omMetadataManager.getOpenKeyTable().get(openKey);
+    OmKeyInfo omKeyInfo = verifyPathInOpenKeyTable(keyName, id, false);
     Assert.assertNull(omKeyInfo);
 
-    omFileCreateRequest = new OMFileCreateRequest(modifiedOmRequest);
+    omFileCreateRequest = getOMFileCreateRequest(modifiedOmRequest);
 
     OMClientResponse omFileCreateResponse =
         omFileCreateRequest.validateAndUpdateCache(ozoneManager, 100L,
@@ -146,8 +141,7 @@ public class TestOMFileCreateRequest extends TestOMKeyRequest {
 
     // Check open table whether key is added or not.
 
-    omKeyInfo = omMetadataManager.getOpenKeyTable().get(openKey);
-    Assert.assertNotNull(omKeyInfo);
+    omKeyInfo = verifyPathInOpenKeyTable(keyName, id, true);
 
     List< OmKeyLocationInfo > omKeyLocationInfoList =
         omKeyInfo.getLatestVersionLocations().getLocationList();
@@ -179,12 +173,11 @@ public class TestOMFileCreateRequest extends TestOMKeyRequest {
         HddsProtos.ReplicationFactor.ONE, HddsProtos.ReplicationType.RATIS,
             false, true);
 
-    OMFileCreateRequest omFileCreateRequest = new OMFileCreateRequest(
-        omRequest);
+    OMFileCreateRequest omFileCreateRequest = getOMFileCreateRequest(omRequest);
 
     OMRequest modifiedOmRequest = omFileCreateRequest.preExecute(ozoneManager);
 
-    omFileCreateRequest = new OMFileCreateRequest(modifiedOmRequest);
+    omFileCreateRequest = getOMFileCreateRequest(modifiedOmRequest);
 
     OMClientResponse omFileCreateResponse =
         omFileCreateRequest.validateAndUpdateCache(ozoneManager, 100L,
@@ -200,13 +193,11 @@ public class TestOMFileCreateRequest extends TestOMKeyRequest {
         false, true);
 
     TestOMRequestUtils.addVolumeToDB(volumeName, omMetadataManager);
-    OMFileCreateRequest omFileCreateRequest = new OMFileCreateRequest(
-        omRequest);
+    OMFileCreateRequest omFileCreateRequest = getOMFileCreateRequest(omRequest);
 
     OMRequest modifiedOmRequest = omFileCreateRequest.preExecute(ozoneManager);
 
-    omFileCreateRequest = new OMFileCreateRequest(modifiedOmRequest);
-
+    omFileCreateRequest = getOMFileCreateRequest(modifiedOmRequest);
 
     OMClientResponse omFileCreateResponse =
         omFileCreateRequest.validateAndUpdateCache(ozoneManager, 100L,
@@ -311,8 +302,7 @@ public class TestOMFileCreateRequest extends TestOMKeyRequest {
     testNonRecursivePath(key, false, false, true);
   }
 
-
-  private void testNonRecursivePath(String key,
+  protected void testNonRecursivePath(String key,
       boolean overWrite, boolean recursive, boolean fail) throws Exception {
     OMRequest omRequest = createFileRequest(volumeName, bucketName, key,
         HddsProtos.ReplicationFactor.ONE, HddsProtos.ReplicationType.RATIS,
@@ -320,12 +310,11 @@ public class TestOMFileCreateRequest extends TestOMKeyRequest {
 
     TestOMRequestUtils.addVolumeAndBucketToDB(volumeName, bucketName,
         omMetadataManager);
-    OMFileCreateRequest omFileCreateRequest = new OMFileCreateRequest(
-        omRequest);
+    OMFileCreateRequest omFileCreateRequest = getOMFileCreateRequest(omRequest);
 
     OMRequest modifiedOmRequest = omFileCreateRequest.preExecute(ozoneManager);
 
-    omFileCreateRequest = new OMFileCreateRequest(modifiedOmRequest);
+    omFileCreateRequest = getOMFileCreateRequest(modifiedOmRequest);
 
     OMClientResponse omFileCreateResponse =
         omFileCreateRequest.validateAndUpdateCache(ozoneManager, 100L,
@@ -341,10 +330,9 @@ public class TestOMFileCreateRequest extends TestOMKeyRequest {
       Assert.assertTrue(omFileCreateResponse.getOMResponse().getSuccess());
       long id = modifiedOmRequest.getCreateFileRequest().getClientID();
 
-      String openKey = omMetadataManager.getOpenKey(volumeName, bucketName,
-          key, id);
-      OmKeyInfo omKeyInfo = omMetadataManager.getOpenKeyTable().get(openKey);
-      Assert.assertNotNull(omKeyInfo);
+      verifyKeyNameInCreateFileResponse(key, omFileCreateResponse);
+
+      OmKeyInfo omKeyInfo = verifyPathInOpenKeyTable(key, id, true);
 
       List< OmKeyLocationInfo > omKeyLocationInfoList =
           omKeyInfo.getLatestVersionLocations().getLocationList();
@@ -368,6 +356,14 @@ public class TestOMFileCreateRequest extends TestOMKeyRequest {
     }
   }
 
+  private void verifyKeyNameInCreateFileResponse(String key,
+      OMClientResponse omFileCreateResponse) {
+    OzoneManagerProtocolProtos.CreateFileResponse createFileResponse =
+            omFileCreateResponse.getOMResponse().getCreateFileResponse();
+    String actualFileName = createFileResponse.getKeyInfo().getKeyName();
+    Assert.assertEquals("Incorrect keyName", key, actualFileName);
+  }
+
   /**
    * Create OMRequest which encapsulates OMFileCreateRequest.
    * @param volumeName
@@ -377,7 +373,8 @@ public class TestOMFileCreateRequest extends TestOMKeyRequest {
    * @param replicationType
    * @return OMRequest
    */
-  private OMRequest createFileRequest(
+  @NotNull
+  protected OMRequest createFileRequest(
       String volumeName, String bucketName, String keyName,
       HddsProtos.ReplicationFactor replicationFactor,
       HddsProtos.ReplicationType replicationType, boolean overWrite,
@@ -399,4 +396,38 @@ public class TestOMFileCreateRequest extends TestOMKeyRequest {
         .setCreateFileRequest(createFileRequest).build();
 
   }
+
+  /**
+   * Verify path in open key table. Also, it returns OMKeyInfo for the given
+   * key path.
+   *
+   * @param key      key name
+   * @param id       client id
+   * @param doAssert if true then do assertion, otherwise it just skip.
+   * @return om key info for the given key path.
+   * @throws Exception DB failure
+   */
+  protected OmKeyInfo verifyPathInOpenKeyTable(String key, long id,
+                                               boolean doAssert)
+          throws Exception {
+    String openKey = omMetadataManager.getOpenKey(volumeName, bucketName,
+            key, id);
+    OmKeyInfo omKeyInfo = omMetadataManager.getOpenKeyTable().get(openKey);
+    if (doAssert) {
+      Assert.assertNotNull("Failed to find key in OpenKeyTable", omKeyInfo);
+    }
+    return omKeyInfo;
+  }
+
+  /**
+   * Gets OMFileCreateRequest reference.
+   *
+   * @param omRequest om request
+   * @return OMFileCreateRequest reference
+   */
+  @NotNull
+  protected OMFileCreateRequest getOMFileCreateRequest(OMRequest omRequest){
+    return new OMFileCreateRequest(omRequest);
+  }
+
 }
diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/file/TestOMFileCreateRequestV1.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/file/TestOMFileCreateRequestV1.java
new file mode 100644
index 0000000..7ded386
--- /dev/null
+++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/file/TestOMFileCreateRequestV1.java
@@ -0,0 +1,192 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.om.request.file;
+
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.ozone.om.OMConfigKeys;
+import org.apache.hadoop.ozone.om.helpers.OmDirectoryInfo;
+import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
+import org.apache.hadoop.ozone.om.request.TestOMRequestUtils;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.Time;
+import org.jetbrains.annotations.NotNull;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.UUID;
+
+/**
+ * Tests OMFileCreateRequest layout version V1.
+ */
+public class TestOMFileCreateRequestV1 extends TestOMFileCreateRequest {
+
+  @Test
+  public void testValidateAndUpdateCacheWithNonRecursive() throws Exception {
+    testNonRecursivePath(UUID.randomUUID().toString(), false, false, false);
+    testNonRecursivePath("a/b", false, false, true);
+    Assert.assertEquals("Invalid metrics value", 0, omMetrics.getNumKeys());
+
+    // Create parent dirs for the path
+    TestOMRequestUtils.addParentsToDirTable(volumeName, bucketName,
+            "a/b/c", omMetadataManager);
+    String fileNameD = "d";
+    TestOMRequestUtils.addKeyToTable(false, volumeName, bucketName,
+            "a/b/c/" + fileNameD, 0L, HddsProtos.ReplicationType.RATIS,
+            HddsProtos.ReplicationFactor.ONE, omMetadataManager);
+
+    // cannot create file if directory of same name exists
+    testNonRecursivePath("a/b/c", false, false, true);
+
+    // Delete child key but retain path "a/b/ in the key table
+    OmDirectoryInfo dirPathC = getDirInfo("a/b/c");
+    Assert.assertNotNull("Failed to find dir path: a/b/c", dirPathC);
+    String dbFileD = omMetadataManager.getOzonePathKey(
+            dirPathC.getObjectID(), fileNameD);
+    omMetadataManager.getKeyTable().delete(dbFileD);
+    omMetadataManager.getKeyTable().delete(dirPathC.getPath());
+
+    // can create non-recursive because parents already exist.
+    testNonRecursivePath("a/b/e", false, false, false);
+  }
+
+  @Test
+  public void testValidateAndUpdateCacheWithRecursiveAndOverWrite()
+          throws Exception {
+    String key = "c/d/e/f";
+    // Should be able to create file even if parent directories does not exist
+    testNonRecursivePath(key, false, true, false);
+    Assert.assertEquals("Invalid metrics value", 3, omMetrics.getNumKeys());
+
+    // Add the key to key table
+    OmDirectoryInfo omDirInfo = getDirInfo("c/d/e");
+    OmKeyInfo omKeyInfo =
+            TestOMRequestUtils.createOmKeyInfo(volumeName, bucketName, key,
+                    HddsProtos.ReplicationType.RATIS,
+                    HddsProtos.ReplicationFactor.ONE,
+                    omDirInfo.getObjectID() + 10,
+                    omDirInfo.getObjectID(), 100, Time.now());
+    TestOMRequestUtils.addFileToKeyTable(false, false,
+            "f", omKeyInfo, -1,
+            omDirInfo.getObjectID() + 10, omMetadataManager);
+
+    // Even if key exists, should be able to create file as overwrite is set
+    // to true
+    testNonRecursivePath(key, true, true, false);
+    testNonRecursivePath(key, false, true, true);
+  }
+
+  @Test
+  public void testValidateAndUpdateCacheWithNonRecursiveAndOverWrite()
+          throws Exception {
+    String parentDir = "c/d/e";
+    String fileName = "f";
+    String key = parentDir + "/" + fileName;
+    TestOMRequestUtils.addVolumeAndBucketToDB(volumeName, bucketName,
+            omMetadataManager);
+    // Create parent dirs for the path
+    long parentId = TestOMRequestUtils.addParentsToDirTable(volumeName,
+            bucketName, parentDir, omMetadataManager);
+
+    // Need to add the path which starts with "c/d/e" to OpenKeyTable as this is
+    // non-recursive parent should exist.
+    testNonRecursivePath(key, false, false, false);
+
+    OmKeyInfo omKeyInfo =
+            TestOMRequestUtils.createOmKeyInfo(volumeName, bucketName, key,
+                    HddsProtos.ReplicationType.RATIS,
+                    HddsProtos.ReplicationFactor.ONE,
+                    parentId + 1,
+                    parentId, 100, Time.now());
+    TestOMRequestUtils.addFileToKeyTable(false, false,
+            fileName, omKeyInfo, -1, 50, omMetadataManager);
+
+    // Even if key exists in KeyTable, should be able to create file as
+    // overwrite is set to true
+    testNonRecursivePath(key, true, false, false);
+    testNonRecursivePath(key, false, false, true);
+  }
+
+  protected OmKeyInfo verifyPathInOpenKeyTable(String key, long id,
+                                             boolean doAssert)
+          throws Exception {
+    long bucketId = TestOMRequestUtils.getBucketId(volumeName, bucketName,
+            omMetadataManager);
+    String[] pathComponents = StringUtils.split(key, '/');
+    long parentId = bucketId;
+    for (int indx = 0; indx < pathComponents.length; indx++) {
+      String pathElement = pathComponents[indx];
+      // Reached last component, which is file name
+      if (indx == pathComponents.length - 1) {
+        String dbOpenFileName = omMetadataManager.getOpenFileName(
+                parentId, pathElement, id);
+        OmKeyInfo omKeyInfo = omMetadataManager.getOpenKeyTable()
+                .get(dbOpenFileName);
+        if (doAssert) {
+          Assert.assertNotNull("Invalid key!", omKeyInfo);
+        }
+        return omKeyInfo;
+      } else {
+        // directory
+        String dbKey = omMetadataManager.getOzonePathKey(parentId,
+                pathElement);
+        OmDirectoryInfo dirInfo =
+                omMetadataManager.getDirectoryTable().get(dbKey);
+        parentId = dirInfo.getObjectID();
+      }
+    }
+    if (doAssert) {
+      Assert.fail("Invalid key!");
+    }
+    return null;
+  }
+
+  private OmDirectoryInfo getDirInfo(String key)
+          throws Exception {
+    long bucketId = TestOMRequestUtils.getBucketId(volumeName, bucketName,
+            omMetadataManager);
+    String[] pathComponents = StringUtils.split(key, '/');
+    long parentId = bucketId;
+    OmDirectoryInfo dirInfo = null;
+    for (int indx = 0; indx < pathComponents.length; indx++) {
+      String pathElement = pathComponents[indx];
+      // Reached last component, which is file name
+      // directory
+      String dbKey = omMetadataManager.getOzonePathKey(parentId,
+              pathElement);
+      dirInfo =
+              omMetadataManager.getDirectoryTable().get(dbKey);
+      parentId = dirInfo.getObjectID();
+    }
+    return dirInfo;
+  }
+
+  @NotNull
+  @Override
+  protected OzoneConfiguration getOzoneConfiguration() {
+    OzoneConfiguration config = super.getOzoneConfiguration();
+    config.set(OMConfigKeys.OZONE_OM_LAYOUT_VERSION, "V1");
+    return config;
+  }
+
+  protected OMFileCreateRequest getOMFileCreateRequest(OMRequest omRequest) {
+    return new OMFileCreateRequestV1(omRequest);
+  }
+}
diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyCommitRequest.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyCommitRequest.java
index b327b76..09d499e 100644
--- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyCommitRequest.java
+++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyCommitRequest.java
@@ -19,12 +19,15 @@
 
 package org.apache.hadoop.ozone.om.request.key;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.UUID;
 import java.util.stream.Collectors;
 
 import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.om.helpers.OzoneFSUtils;
+import org.jetbrains.annotations.NotNull;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -48,6 +51,8 @@ import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
  */
 public class TestOMKeyCommitRequest extends TestOMKeyRequest {
 
+  private String parentDir;
+
   @Test
   public void testPreExecute() throws Exception {
     doPreExecute(createCommitKeyRequest());
@@ -56,20 +61,15 @@ public class TestOMKeyCommitRequest extends TestOMKeyRequest {
   @Test
   public void testValidateAndUpdateCache() throws Exception {
 
-    OMRequest modifiedOmRequest =
-        doPreExecute(createCommitKeyRequest());
+    OMRequest modifiedOmRequest = doPreExecute(createCommitKeyRequest());
 
     OMKeyCommitRequest omKeyCommitRequest =
-        new OMKeyCommitRequest(modifiedOmRequest);
+            getOmKeyCommitRequest(modifiedOmRequest);
 
     TestOMRequestUtils.addVolumeAndBucketToDB(volumeName, bucketName,
         omMetadataManager);
 
-    TestOMRequestUtils.addKeyToTable(true, volumeName, bucketName, keyName,
-        clientID, replicationType, replicationFactor, omMetadataManager);
-
-    String ozoneKey = omMetadataManager.getOzoneKey(volumeName, bucketName,
-        keyName);
+    String ozoneKey = addKeyToOpenKeyTable();
 
     // Key should not be there in key table, as validateAndUpdateCache is
     // still not called.
@@ -92,6 +92,8 @@ public class TestOMKeyCommitRequest extends TestOMKeyRequest {
     omKeyInfo = omMetadataManager.getKeyTable().get(ozoneKey);
 
     Assert.assertNotNull(omKeyInfo);
+    // DB keyInfo format
+    verifyKeyName(omKeyInfo);
 
     // Check modification time
 
@@ -107,7 +109,14 @@ public class TestOMKeyCommitRequest extends TestOMKeyRequest {
 
     Assert.assertEquals(locationInfoListFromCommitKeyRequest,
         omKeyInfo.getLatestVersionLocations().getLocationList());
+  }
 
+  @Test
+  public void testValidateAndUpdateCacheWithSubDirs() throws Exception {
+    parentDir = "dir1/dir2/dir3/";
+    keyName = parentDir + UUID.randomUUID().toString();
+
+    testValidateAndUpdateCache();
   }
 
   @Test
@@ -117,10 +126,9 @@ public class TestOMKeyCommitRequest extends TestOMKeyRequest {
         doPreExecute(createCommitKeyRequest());
 
     OMKeyCommitRequest omKeyCommitRequest =
-        new OMKeyCommitRequest(modifiedOmRequest);
+            getOmKeyCommitRequest(modifiedOmRequest);
 
-    String ozoneKey = omMetadataManager.getOzoneKey(volumeName, bucketName,
-        keyName);
+    String ozoneKey = getOzonePathKey();
 
     // Key should not be there in key table, as validateAndUpdateCache is
     // still not called.
@@ -147,13 +155,11 @@ public class TestOMKeyCommitRequest extends TestOMKeyRequest {
         doPreExecute(createCommitKeyRequest());
 
     OMKeyCommitRequest omKeyCommitRequest =
-        new OMKeyCommitRequest(modifiedOmRequest);
-
+            getOmKeyCommitRequest(modifiedOmRequest);
 
     TestOMRequestUtils.addVolumeToDB(volumeName, OzoneConsts.OZONE,
         omMetadataManager);
-    String ozoneKey = omMetadataManager.getOzoneKey(volumeName, bucketName,
-        keyName);
+    String ozoneKey = getOzonePathKey();
 
     // Key should not be there in key table, as validateAndUpdateCache is
     // still not called.
@@ -180,14 +186,12 @@ public class TestOMKeyCommitRequest extends TestOMKeyRequest {
         doPreExecute(createCommitKeyRequest());
 
     OMKeyCommitRequest omKeyCommitRequest =
-        new OMKeyCommitRequest(modifiedOmRequest);
-
+            getOmKeyCommitRequest(modifiedOmRequest);
 
     TestOMRequestUtils.addVolumeAndBucketToDB(volumeName, bucketName,
         omMetadataManager);
 
-    String ozoneKey = omMetadataManager.getOzoneKey(volumeName, bucketName,
-        keyName);
+    String ozoneKey = getOzonePathKey();
 
     // Key should not be there in key table, as validateAndUpdateCache is
     // still not called.
@@ -216,7 +220,7 @@ public class TestOMKeyCommitRequest extends TestOMKeyRequest {
   private OMRequest doPreExecute(OMRequest originalOMRequest) throws Exception {
 
     OMKeyCommitRequest omKeyCommitRequest =
-        new OMKeyCommitRequest(originalOMRequest);
+            getOmKeyCommitRequest(originalOMRequest);
 
     OMRequest modifiedOmRequest = omKeyCommitRequest.preExecute(ozoneManager);
 
@@ -294,4 +298,34 @@ public class TestOMKeyCommitRequest extends TestOMKeyRequest {
     return keyLocations;
   }
 
+  protected String getParentDir() {
+    return parentDir;
+  }
+
+  @NotNull
+  protected String getOzonePathKey() throws IOException {
+    return omMetadataManager.getOzoneKey(volumeName, bucketName,
+            keyName);
+  }
+
+  @NotNull
+  protected String addKeyToOpenKeyTable() throws Exception {
+    TestOMRequestUtils.addKeyToTable(true, volumeName, bucketName, keyName,
+            clientID, replicationType, replicationFactor, omMetadataManager);
+
+    return getOzonePathKey();
+  }
+
+  @NotNull
+  protected OMKeyCommitRequest getOmKeyCommitRequest(OMRequest omRequest) {
+    return new OMKeyCommitRequest(omRequest);
+  }
+
+  protected void verifyKeyName(OmKeyInfo omKeyInfo) {
+    Assert.assertEquals("Incorrect KeyName", keyName,
+            omKeyInfo.getKeyName());
+    String fileName = OzoneFSUtils.getFileName(keyName);
+    Assert.assertEquals("Incorrect FileName", fileName,
+            omKeyInfo.getFileName());
+  }
 }
diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyCommitRequestV1.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyCommitRequestV1.java
new file mode 100644
index 0000000..f5168e1
--- /dev/null
+++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyCommitRequestV1.java
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.hadoop.ozone.om.request.key;
+
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.ozone.om.OMConfigKeys;
+import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
+import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
+import org.apache.hadoop.ozone.om.helpers.OzoneFSUtils;
+import org.apache.hadoop.ozone.om.request.TestOMRequestUtils;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
+import org.apache.hadoop.util.Time;
+import org.jetbrains.annotations.NotNull;
+import org.junit.Assert;
+
+import java.io.IOException;
+
+/**
+ * Class tests OMKeyCommitRequestV1 class layout version V1.
+ */
+public class TestOMKeyCommitRequestV1 extends TestOMKeyCommitRequest {
+
+  private long parentID = Long.MIN_VALUE;
+
+  private long getBucketID() throws java.io.IOException {
+    String bucketKey = omMetadataManager.getBucketKey(volumeName, bucketName);
+    OmBucketInfo omBucketInfo =
+            omMetadataManager.getBucketTable().get(bucketKey);
+    if(omBucketInfo!= null){
+      return omBucketInfo.getObjectID();
+    }
+    // bucket doesn't exists in DB
+    return Long.MIN_VALUE;
+  }
+
+  @Override
+  protected String getOzonePathKey() throws IOException {
+    long bucketID = getBucketID();
+    String fileName = OzoneFSUtils.getFileName(keyName);
+    return omMetadataManager.getOzonePathKey(bucketID, fileName);
+  }
+
+  @Override
+  protected String addKeyToOpenKeyTable() throws Exception {
+    // need to initialize parentID
+    if (getParentDir() == null) {
+      parentID = getBucketID();
+    } else {
+      parentID = TestOMRequestUtils.addParentsToDirTable(volumeName,
+              bucketName, getParentDir(), omMetadataManager);
+    }
+    long objectId = 100;
+
+    OmKeyInfo omKeyInfoV1 =
+            TestOMRequestUtils.createOmKeyInfo(volumeName, bucketName, keyName,
+                    HddsProtos.ReplicationType.RATIS,
+                    HddsProtos.ReplicationFactor.ONE, objectId, parentID, 100,
+                    Time.now());
+
+    String fileName = OzoneFSUtils.getFileName(keyName);
+    TestOMRequestUtils.addFileToKeyTable(true, false,
+            fileName, omKeyInfoV1, clientID, txnLogId, omMetadataManager);
+
+    return omMetadataManager.getOzonePathKey(parentID, fileName);
+  }
+
+  @NotNull
+  @Override
+  protected OzoneConfiguration getOzoneConfiguration() {
+    OzoneConfiguration config = super.getOzoneConfiguration();
+    config.set(OMConfigKeys.OZONE_OM_LAYOUT_VERSION, "V1");
+    return config;
+  }
+
+  @NotNull
+  protected OMKeyCommitRequest getOmKeyCommitRequest(OMRequest omRequest) {
+    return new OMKeyCommitRequestV1(omRequest);
+  }
+
+  protected void verifyKeyName(OmKeyInfo omKeyInfo) {
+    // V1 format - stores fileName in the keyName DB field.
+    String fileName = OzoneFSUtils.getFileName(keyName);
+    Assert.assertEquals("Incorrect FileName", fileName,
+            omKeyInfo.getFileName());
+    Assert.assertEquals("Incorrect KeyName", fileName,
+            omKeyInfo.getKeyName());
+  }
+}
diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyRequest.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyRequest.java
index 116ba5c..5f15c50 100644
--- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyRequest.java
+++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyRequest.java
@@ -29,6 +29,7 @@ import org.apache.hadoop.ozone.om.KeyManagerImpl;
 import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerDoubleBufferHelper;
 import org.apache.hadoop.ozone.om.request.OMClientRequest;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.KeyArgs;
+import org.jetbrains.annotations.NotNull;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Rule;
@@ -89,6 +90,7 @@ public class TestOMKeyRequest {
   protected long clientID;
   protected long scmBlockSize = 1000L;
   protected long dataSize;
+  protected long txnLogId = 100000L;
 
   // Just setting ozoneManagerDoubleBuffer which does nothing.
   protected OzoneManagerDoubleBufferHelper ozoneManagerDoubleBufferHelper =
@@ -101,7 +103,7 @@ public class TestOMKeyRequest {
   public void setup() throws Exception {
     ozoneManager = Mockito.mock(OzoneManager.class);
     omMetrics = OMMetrics.create();
-    OzoneConfiguration ozoneConfiguration = new OzoneConfiguration();
+    OzoneConfiguration ozoneConfiguration = getOzoneConfiguration();
     ozoneConfiguration.set(OMConfigKeys.OZONE_OM_DB_DIRS,
         folder.newFolder().getAbsolutePath());
     omMetadataManager = new OmMetadataManagerImpl(ozoneConfiguration);
@@ -169,6 +171,11 @@ public class TestOMKeyRequest {
         .thenReturn(new ResolvedBucket(volumeAndBucket, volumeAndBucket));
   }
 
+  @NotNull
+  protected OzoneConfiguration getOzoneConfiguration() {
+    return new OzoneConfiguration();
+  }
+
   @After
   public void stop() {
     omMetrics.unRegister();
diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/file/TestOMFileCreateResponseV1.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/file/TestOMFileCreateResponseV1.java
new file mode 100644
index 0000000..d2ab465
--- /dev/null
+++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/file/TestOMFileCreateResponseV1.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.om.response.file;
+
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.ozone.om.OMConfigKeys;
+import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
+import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
+import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
+import org.apache.hadoop.ozone.om.request.TestOMRequestUtils;
+import org.apache.hadoop.ozone.om.response.key.OMKeyCreateResponse;
+import org.apache.hadoop.ozone.om.response.key.TestOMKeyCreateResponse;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse;
+import org.apache.hadoop.util.Time;
+import org.jetbrains.annotations.NotNull;
+import org.junit.Assert;
+
+/**
+ * Tests MKeyCreateResponse layout version V1.
+ */
+public class TestOMFileCreateResponseV1 extends TestOMKeyCreateResponse {
+
+  @NotNull
+  @Override
+  protected OmKeyInfo getOmKeyInfo() {
+    Assert.assertNotNull(omBucketInfo);
+    return TestOMRequestUtils.createOmKeyInfo(volumeName,
+            omBucketInfo.getBucketName(), keyName, replicationType,
+            replicationFactor,
+            omBucketInfo.getObjectID() + 1,
+            omBucketInfo.getObjectID(), 100, Time.now());
+  }
+
+  @NotNull
+  @Override
+  protected String getOpenKeyName() {
+    Assert.assertNotNull(omBucketInfo);
+    return omMetadataManager.getOpenFileName(
+            omBucketInfo.getObjectID(), keyName, clientID);
+  }
+
+  @NotNull
+  @Override
+  protected OMKeyCreateResponse getOmKeyCreateResponse(OmKeyInfo keyInfo,
+      OmVolumeArgs volumeArgs, OmBucketInfo bucketInfo, OMResponse response) {
+
+    return new OMFileCreateResponseV1(response, keyInfo, null, clientID,
+            volumeArgs, bucketInfo);
+  }
+
+  @NotNull
+  @Override
+  protected OzoneConfiguration getOzoneConfiguration() {
+    OzoneConfiguration config = super.getOzoneConfiguration();
+    config.set(OMConfigKeys.OZONE_OM_LAYOUT_VERSION, "V1");
+    return config;
+  }
+}
diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/key/TestOMKeyCommitResponse.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/key/TestOMKeyCommitResponse.java
index ab425f2..a8b3147 100644
--- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/key/TestOMKeyCommitResponse.java
+++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/key/TestOMKeyCommitResponse.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.ozone.om.response.key;
 import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
 import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
 import org.apache.hadoop.util.Time;
+import org.jetbrains.annotations.NotNull;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -31,20 +32,23 @@ import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
 /**
  * Tests OMKeyCommitResponse.
  */
+@SuppressWarnings("visibilitymodifier")
 public class TestOMKeyCommitResponse extends TestOMKeyResponse {
 
+  protected OmBucketInfo omBucketInfo;
+
   @Test
   public void testAddToDBBatch() throws Exception {
 
-    OmKeyInfo omKeyInfo = TestOMRequestUtils.createOmKeyInfo(volumeName,
-        bucketName, keyName, replicationType, replicationFactor);
     OmVolumeArgs omVolumeArgs = OmVolumeArgs.newBuilder()
         .setOwnerName(keyName).setAdminName(keyName)
         .setVolume(volumeName).setCreationTime(Time.now()).build();
-    OmBucketInfo omBucketInfo = OmBucketInfo.newBuilder()
+    omBucketInfo = OmBucketInfo.newBuilder()
         .setVolumeName(volumeName).setBucketName(bucketName)
         .setCreationTime(Time.now()).build();
 
+    OmKeyInfo omKeyInfo = getOmKeyInfo();
+
     OzoneManagerProtocolProtos.OMResponse omResponse =
         OzoneManagerProtocolProtos.OMResponse.newBuilder().setCommitKeyResponse(
             OzoneManagerProtocolProtos.CommitKeyResponse.getDefaultInstance())
@@ -54,17 +58,14 @@ public class TestOMKeyCommitResponse extends TestOMKeyResponse {
 
     // As during commit Key, entry will be already there in openKeyTable.
     // Adding it here.
-    TestOMRequestUtils.addKeyToTable(true, volumeName, bucketName, keyName,
-        clientID, replicationType, replicationFactor, omMetadataManager);
+    addKeyToOpenKeyTable();
 
-    String openKey = omMetadataManager.getOpenKey(volumeName, bucketName,
-        keyName, clientID);
+    String openKey = getOpenKeyName();
     Assert.assertTrue(omMetadataManager.getOpenKeyTable().isExist(openKey));
 
-    String ozoneKey = omMetadataManager.getOzoneKey(volumeName, bucketName,
-        keyName);
-    OMKeyCommitResponse omKeyCommitResponse = new OMKeyCommitResponse(
-        omResponse, omKeyInfo, ozoneKey, openKey, omVolumeArgs, omBucketInfo);
+    String ozoneKey = getOzoneKey();
+    OMKeyCommitResponse omKeyCommitResponse = getOmKeyCommitResponse(
+            omVolumeArgs, omKeyInfo, omResponse, openKey, ozoneKey);
 
     omKeyCommitResponse.addToDBBatch(omMetadataManager, batchOperation);
 
@@ -73,8 +74,7 @@ public class TestOMKeyCommitResponse extends TestOMKeyResponse {
 
     // When key commit key is deleted from openKey table and added to keyTable.
     Assert.assertFalse(omMetadataManager.getOpenKeyTable().isExist(openKey));
-    Assert.assertTrue(omMetadataManager.getKeyTable().isExist(
-        omMetadataManager.getOzoneKey(volumeName, bucketName, keyName)));
+    Assert.assertTrue(omMetadataManager.getKeyTable().isExist(ozoneKey));
   }
 
   @Test
@@ -85,7 +85,7 @@ public class TestOMKeyCommitResponse extends TestOMKeyResponse {
     OmVolumeArgs omVolumeArgs = OmVolumeArgs.newBuilder()
         .setOwnerName(keyName).setAdminName(keyName)
         .setVolume(volumeName).setCreationTime(Time.now()).build();
-    OmBucketInfo omBucketInfo = OmBucketInfo.newBuilder()
+    omBucketInfo = OmBucketInfo.newBuilder()
         .setVolumeName(volumeName).setBucketName(bucketName)
         .setCreationTime(Time.now()).build();
 
@@ -96,18 +96,15 @@ public class TestOMKeyCommitResponse extends TestOMKeyResponse {
             .setCmdType(OzoneManagerProtocolProtos.Type.CommitKey)
             .build();
 
-    String openKey = omMetadataManager.getOpenKey(volumeName, bucketName,
-        keyName, clientID);
-    String ozoneKey = omMetadataManager.getOzoneKey(volumeName, bucketName,
-        keyName);
+    String openKey = getOpenKeyName();
+    String ozoneKey = getOzoneKey();
 
-    OMKeyCommitResponse omKeyCommitResponse = new OMKeyCommitResponse(
-        omResponse, omKeyInfo, ozoneKey, openKey, omVolumeArgs, omBucketInfo);
+    OMKeyCommitResponse omKeyCommitResponse = getOmKeyCommitResponse(
+            omVolumeArgs, omKeyInfo, omResponse, openKey, ozoneKey);
 
     // As during commit Key, entry will be already there in openKeyTable.
     // Adding it here.
-    TestOMRequestUtils.addKeyToTable(true, volumeName, bucketName, keyName,
-        clientID, replicationType, replicationFactor, omMetadataManager);
+    addKeyToOpenKeyTable();
 
     Assert.assertTrue(omMetadataManager.getOpenKeyTable().isExist(openKey));
 
@@ -120,7 +117,30 @@ public class TestOMKeyCommitResponse extends TestOMKeyResponse {
     // As omResponse is error it is a no-op. So, entry should still be in
     // openKey table.
     Assert.assertTrue(omMetadataManager.getOpenKeyTable().isExist(openKey));
-    Assert.assertFalse(omMetadataManager.getKeyTable().isExist(
-        omMetadataManager.getOzoneKey(volumeName, bucketName, keyName)));
+    Assert.assertFalse(omMetadataManager.getKeyTable().isExist(ozoneKey));
+  }
+
+  @NotNull
+  protected void addKeyToOpenKeyTable() throws Exception {
+    TestOMRequestUtils.addKeyToTable(true, volumeName, bucketName, keyName,
+            clientID, replicationType, replicationFactor, omMetadataManager);
+  }
+
+  @NotNull
+  protected String getOzoneKey() {
+    Assert.assertNotNull(omBucketInfo);
+    return omMetadataManager.getOzoneKey(volumeName,
+            omBucketInfo.getBucketName(), keyName);
+  }
+
+  @NotNull
+  protected OMKeyCommitResponse getOmKeyCommitResponse(
+          OmVolumeArgs omVolumeArgs, OmKeyInfo omKeyInfo,
+          OzoneManagerProtocolProtos.OMResponse omResponse, String openKey,
+          String ozoneKey) {
+    Assert.assertNotNull(omBucketInfo);
+    return new OMKeyCommitResponse(
+            omResponse, omKeyInfo, ozoneKey, openKey, omVolumeArgs,
+            omBucketInfo);
   }
 }
diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/key/TestOMKeyCommitResponseV1.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/key/TestOMKeyCommitResponseV1.java
new file mode 100644
index 0000000..369faa9
--- /dev/null
+++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/key/TestOMKeyCommitResponseV1.java
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.om.response.key;
+
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.ozone.om.OMConfigKeys;
+import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
+import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
+import org.apache.hadoop.ozone.om.helpers.OzoneFSUtils;
+import org.apache.hadoop.ozone.om.request.TestOMRequestUtils;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
+import org.apache.hadoop.util.Time;
+import org.jetbrains.annotations.NotNull;
+import org.junit.Assert;
+
+/**
+ * Tests OMKeyCommitResponse layout version V1.
+ */
+public class TestOMKeyCommitResponseV1 extends TestOMKeyCommitResponse {
+
+  @NotNull
+  protected OMKeyCommitResponse getOmKeyCommitResponse(
+          OmVolumeArgs omVolumeArgs, OmKeyInfo omKeyInfo,
+          OzoneManagerProtocolProtos.OMResponse omResponse, String openKey,
+          String ozoneKey) {
+    Assert.assertNotNull(omBucketInfo);
+    return new OMKeyCommitResponseV1(
+            omResponse, omKeyInfo, ozoneKey, openKey, omVolumeArgs,
+            omBucketInfo);
+  }
+
+  @NotNull
+  @Override
+  protected OmKeyInfo getOmKeyInfo() {
+    Assert.assertNotNull(omBucketInfo);
+    return TestOMRequestUtils.createOmKeyInfo(volumeName,
+            omBucketInfo.getBucketName(), keyName, replicationType,
+            replicationFactor,
+            omBucketInfo.getObjectID() + 1,
+            omBucketInfo.getObjectID(), 100, Time.now());
+  }
+
+  @NotNull
+  @Override
+  protected void addKeyToOpenKeyTable() throws Exception {
+    Assert.assertNotNull(omBucketInfo);
+    long parentID = omBucketInfo.getObjectID();
+    long objectId = parentID + 10;
+
+    OmKeyInfo omKeyInfoV1 =
+            TestOMRequestUtils.createOmKeyInfo(volumeName, bucketName, keyName,
+                    HddsProtos.ReplicationType.RATIS,
+                    HddsProtos.ReplicationFactor.ONE, objectId, parentID, 100,
+                    Time.now());
+
+    String fileName = OzoneFSUtils.getFileName(keyName);
+    TestOMRequestUtils.addFileToKeyTable(true, false,
+            fileName, omKeyInfoV1, clientID, txnLogId, omMetadataManager);
+  }
+
+  @NotNull
+  @Override
+  protected String getOpenKeyName() {
+    Assert.assertNotNull(omBucketInfo);
+    return omMetadataManager.getOpenFileName(
+            omBucketInfo.getObjectID(), keyName, clientID);
+  }
+
+  @NotNull
+  @Override
+  protected String getOzoneKey() {
+    Assert.assertNotNull(omBucketInfo);
+    return omMetadataManager.getOzonePathKey(omBucketInfo.getObjectID(),
+            keyName);
+  }
+
+  @NotNull
+  @Override
+  protected OzoneConfiguration getOzoneConfiguration() {
+    OzoneConfiguration config = super.getOzoneConfiguration();
+    config.set(OMConfigKeys.OZONE_OM_LAYOUT_VERSION, "V1");
+    return config;
+  }
+}
diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/key/TestOMKeyCreateResponse.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/key/TestOMKeyCreateResponse.java
index 6357000..8b2dc8a 100644
--- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/key/TestOMKeyCreateResponse.java
+++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/key/TestOMKeyCreateResponse.java
@@ -21,11 +21,11 @@ package org.apache.hadoop.ozone.om.response.key;
 import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
 import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
 import org.apache.hadoop.util.Time;
+import org.jetbrains.annotations.NotNull;
 import org.junit.Assert;
 import org.junit.Test;
 
 import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
-import org.apache.hadoop.ozone.om.request.TestOMRequestUtils;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
     .CreateKeyResponse;
@@ -41,16 +41,15 @@ public class TestOMKeyCreateResponse extends TestOMKeyResponse {
   @Test
   public void testAddToDBBatch() throws Exception {
 
-    OmKeyInfo omKeyInfo = TestOMRequestUtils.createOmKeyInfo(volumeName,
-        bucketName, keyName, replicationType, replicationFactor);
-
     OmVolumeArgs omVolumeArgs = OmVolumeArgs.newBuilder()
         .setOwnerName(keyName).setAdminName(keyName)
         .setVolume(volumeName).setCreationTime(Time.now()).build();
-    OmBucketInfo omBucketInfo = OmBucketInfo.newBuilder()
+    omBucketInfo = OmBucketInfo.newBuilder()
         .setVolumeName(volumeName).setBucketName(bucketName)
         .setCreationTime(Time.now()).build();
 
+    OmKeyInfo omKeyInfo = getOmKeyInfo();
+
     OMResponse omResponse = OMResponse.newBuilder().setCreateKeyResponse(
                 CreateKeyResponse.getDefaultInstance())
             .setStatus(OzoneManagerProtocolProtos.Status.OK)
@@ -58,11 +57,11 @@ public class TestOMKeyCreateResponse extends TestOMKeyResponse {
             .build();
 
     OMKeyCreateResponse omKeyCreateResponse =
-        new OMKeyCreateResponse(omResponse, omKeyInfo, null, clientID,
-            omVolumeArgs, omBucketInfo);
+            getOmKeyCreateResponse(omKeyInfo, omVolumeArgs, omBucketInfo,
+                    omResponse);
+
+    String openKey = getOpenKeyName();
 
-    String openKey = omMetadataManager.getOpenKey(volumeName, bucketName,
-        keyName, clientID);
     Assert.assertFalse(omMetadataManager.getOpenKeyTable().isExist(openKey));
     omKeyCreateResponse.addToDBBatch(omMetadataManager, batchOperation);
 
@@ -74,16 +73,16 @@ public class TestOMKeyCreateResponse extends TestOMKeyResponse {
 
   @Test
   public void testAddToDBBatchWithErrorResponse() throws Exception {
-    OmKeyInfo omKeyInfo = TestOMRequestUtils.createOmKeyInfo(volumeName,
-        bucketName, keyName, replicationType, replicationFactor);
 
     OmVolumeArgs omVolumeArgs = OmVolumeArgs.newBuilder()
         .setOwnerName(keyName).setAdminName(keyName)
         .setVolume(volumeName).setCreationTime(Time.now()).build();
-    OmBucketInfo omBucketInfo = OmBucketInfo.newBuilder()
+    omBucketInfo = OmBucketInfo.newBuilder()
         .setVolumeName(volumeName).setBucketName(bucketName)
         .setCreationTime(Time.now()).build();
 
+    OmKeyInfo omKeyInfo = getOmKeyInfo();
+
     OMResponse omResponse = OMResponse.newBuilder().setCreateKeyResponse(
         CreateKeyResponse.getDefaultInstance())
         .setStatus(OzoneManagerProtocolProtos.Status.KEY_NOT_FOUND)
@@ -91,12 +90,11 @@ public class TestOMKeyCreateResponse extends TestOMKeyResponse {
         .build();
 
     OMKeyCreateResponse omKeyCreateResponse =
-        new OMKeyCreateResponse(omResponse, omKeyInfo, null, clientID,
-            omVolumeArgs, omBucketInfo);
+            getOmKeyCreateResponse(omKeyInfo, omVolumeArgs, omBucketInfo,
+                    omResponse);
 
     // Before calling addToDBBatch
-    String openKey = omMetadataManager.getOpenKey(volumeName, bucketName,
-        keyName, clientID);
+    String openKey = getOpenKeyName();
     Assert.assertFalse(omMetadataManager.getOpenKeyTable().isExist(openKey));
 
     omKeyCreateResponse.checkAndUpdateDB(omMetadataManager, batchOperation);
@@ -108,4 +106,12 @@ public class TestOMKeyCreateResponse extends TestOMKeyResponse {
     Assert.assertFalse(omMetadataManager.getOpenKeyTable().isExist(openKey));
 
   }
+
+  @NotNull
+  protected OMKeyCreateResponse getOmKeyCreateResponse(OmKeyInfo keyInfo,
+      OmVolumeArgs volumeArgs, OmBucketInfo bucketInfo, OMResponse response) {
+
+    return new OMKeyCreateResponse(response, keyInfo, null, clientID,
+            volumeArgs, bucketInfo);
+  }
 }
diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/key/TestOMKeyResponse.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/key/TestOMKeyResponse.java
index 1ad4c70..af5449d 100644
--- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/key/TestOMKeyResponse.java
+++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/key/TestOMKeyResponse.java
@@ -20,6 +20,10 @@ package org.apache.hadoop.ozone.om.response.key;
 
 import java.util.UUID;
 
+import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
+import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
+import org.apache.hadoop.ozone.om.request.TestOMRequestUtils;
+import org.jetbrains.annotations.NotNull;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Rule;
@@ -49,11 +53,13 @@ public class TestOMKeyResponse {
   protected String keyName;
   protected HddsProtos.ReplicationFactor replicationFactor;
   protected HddsProtos.ReplicationType replicationType;
+  protected OmBucketInfo omBucketInfo;
   protected long clientID;
+  protected long txnLogId = 100000L;
 
   @Before
   public void setup() throws Exception {
-    OzoneConfiguration ozoneConfiguration = new OzoneConfiguration();
+    OzoneConfiguration ozoneConfiguration = getOzoneConfiguration();
     ozoneConfiguration.set(OMConfigKeys.OZONE_OM_DB_DIRS,
         folder.newFolder().getAbsolutePath());
     omMetadataManager = new OmMetadataManagerImpl(ozoneConfiguration);
@@ -67,6 +73,23 @@ public class TestOMKeyResponse {
     clientID = 1000L;
   }
 
+  @NotNull
+  protected String getOpenKeyName() {
+    return omMetadataManager.getOpenKey(volumeName, bucketName, keyName,
+            clientID);
+  }
+
+  @NotNull
+  protected OmKeyInfo getOmKeyInfo() {
+    return TestOMRequestUtils.createOmKeyInfo(volumeName, bucketName, keyName,
+            replicationType, replicationFactor);
+  }
+
+  @NotNull
+  protected OzoneConfiguration getOzoneConfiguration() {
+    return new OzoneConfiguration();
+  }
+
   @After
   public void stop() {
     Mockito.framework().clearInlineMocks();


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org