You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by ra...@apache.org on 2021/02/25 06:20:29 UTC

[ozone] 09/19: HDDS-2942. Putkey : create key table entries for intermediate directories in the key path (#1764)

This is an automated email from the ASF dual-hosted git repository.

rakeshr pushed a commit to branch HDDS-2939
in repository https://gitbox.apache.org/repos/asf/ozone.git

commit 6145c24b0d91fd8047d46c49963b6990fdb860ce
Author: Rakesh Radhakrishnan <ra...@apache.org>
AuthorDate: Mon Jan 11 20:03:55 2021 +0530

    HDDS-2942. Putkey : create key table entries for intermediate directories in the key path (#1764)
---
 .../apache/hadoop/ozone/om/TestObjectStoreV1.java  | 215 +++++++++++++++++++++
 .../om/ratis/utils/OzoneManagerRatisUtils.java     |   4 +
 .../om/request/file/OMFileCreateRequestV1.java     |   3 +-
 .../ozone/om/request/key/OMKeyCommitRequestV1.java |   5 +-
 .../ozone/om/request/key/OMKeyCreateRequest.java   |  22 ++-
 .../OMKeyCreateRequestV1.java}                     | 148 ++++++--------
 .../om/response/file/OMFileCreateResponseV1.java   |   3 +-
 .../om/response/key/OMAllocateBlockResponseV1.java |   1 -
 .../om/response/key/OMKeyCommitResponseV1.java     |   2 -
 ...kResponseV1.java => OMKeyCreateResponseV1.java} |  42 ++--
 .../om/request/key/TestOMKeyCreateRequest.java     |  53 ++---
 .../om/request/key/TestOMKeyCreateRequestV1.java   | 129 +++++++++++++
 .../key/TestOMAllocateBlockResponseV1.java         |   5 +-
 .../om/response/key/TestOMKeyCommitResponseV1.java |   3 +-
 ...ponseV1.java => TestOMKeyCreateResponseV1.java} |  77 +++-----
 15 files changed, 501 insertions(+), 211 deletions(-)

diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestObjectStoreV1.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestObjectStoreV1.java
new file mode 100644
index 0000000..c6ae4ca
--- /dev/null
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestObjectStoreV1.java
@@ -0,0 +1,215 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.ozone.om;
+
+import org.apache.commons.lang3.RandomStringUtils;
+import org.apache.hadoop.hdds.client.ReplicationFactor;
+import org.apache.hadoop.hdds.client.ReplicationType;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.utils.db.Table;
+import org.apache.hadoop.hdds.utils.db.TableIterator;
+import org.apache.hadoop.ozone.MiniOzoneCluster;
+import org.apache.hadoop.ozone.client.ObjectStore;
+import org.apache.hadoop.ozone.client.OzoneBucket;
+import org.apache.hadoop.ozone.client.OzoneClient;
+import org.apache.hadoop.ozone.client.OzoneVolume;
+import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
+import org.apache.hadoop.ozone.om.helpers.OmDirectoryInfo;
+import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
+import org.apache.hadoop.ozone.om.request.TestOMRequestUtils;
+import org.apache.hadoop.util.StringUtils;
+import org.junit.Assert;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.Timeout;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.UUID;
+
+public class TestObjectStoreV1 {
+
+  private static MiniOzoneCluster cluster = null;
+  private static OzoneConfiguration conf;
+  private static String clusterId;
+  private static String scmId;
+  private static String omId;
+
+  @Rule
+  public Timeout timeout = new Timeout(240000);
+
+  /**
+   * Create a MiniDFSCluster for testing.
+   * <p>
+   *
+   * @throws IOException
+   */
+  @BeforeClass
+  public static void init() throws Exception {
+    conf = new OzoneConfiguration();
+    clusterId = UUID.randomUUID().toString();
+    scmId = UUID.randomUUID().toString();
+    omId = UUID.randomUUID().toString();
+    conf.set(OMConfigKeys.OZONE_OM_LAYOUT_VERSION, "V1");
+    cluster = MiniOzoneCluster.newBuilder(conf)
+            .setClusterId(clusterId)
+            .setScmId(scmId)
+            .setOmId(omId)
+            .build();
+    cluster.waitForClusterToBeReady();
+  }
+
+  @Test
+  public void testCreateKey() throws Exception {
+    String volumeName = "volume" + RandomStringUtils.randomNumeric(5);
+    String bucketName = "bucket" + RandomStringUtils.randomNumeric(5);
+    String parent = "a/b/c/";
+    String file = "key" + RandomStringUtils.randomNumeric(5);
+    String key = parent + file;
+
+    OzoneClient client = cluster.getClient();
+
+    ObjectStore objectStore = client.getObjectStore();
+    objectStore.createVolume(volumeName);
+
+    OzoneVolume ozoneVolume = objectStore.getVolume(volumeName);
+    Assert.assertTrue(ozoneVolume.getName().equals(volumeName));
+    ozoneVolume.createBucket(bucketName);
+
+    OzoneBucket ozoneBucket = ozoneVolume.getBucket(bucketName);
+    Assert.assertTrue(ozoneBucket.getName().equals(bucketName));
+
+    Table<String, OmKeyInfo> openKeyTable =
+            cluster.getOzoneManager().getMetadataManager().getOpenKeyTable();
+
+    // before file creation
+    verifyKeyInFileTable(openKeyTable, file, 0, true);
+
+    String data = "random data";
+    OzoneOutputStream ozoneOutputStream = ozoneBucket.createKey(key,
+            data.length(), ReplicationType.RATIS, ReplicationFactor.ONE,
+            new HashMap<>());
+
+    OmDirectoryInfo dirPathC = getDirInfo(volumeName, bucketName, parent);
+    Assert.assertNotNull("Failed to find dir path: a/b/c", dirPathC);
+
+    // after file creation
+    verifyKeyInOpenFileTable(openKeyTable, file, dirPathC.getObjectID(),
+            false);
+
+    ozoneOutputStream.write(data.getBytes(), 0, data.length());
+    ozoneOutputStream.close();
+
+    Table<String, OmKeyInfo> keyTable =
+            cluster.getOzoneManager().getMetadataManager().getKeyTable();
+
+    // After closing the file. File entry should be removed from openFileTable
+    // and it should be added to fileTable.
+    verifyKeyInFileTable(keyTable, file, dirPathC.getObjectID(), false);
+    verifyKeyInOpenFileTable(openKeyTable, file, dirPathC.getObjectID(),
+            true);
+
+    ozoneBucket.deleteKey(key);
+
+    // after key delete
+    verifyKeyInFileTable(keyTable, file, dirPathC.getObjectID(), true);
+    verifyKeyInOpenFileTable(openKeyTable, file, dirPathC.getObjectID(),
+            true);
+  }
+
+  private OmDirectoryInfo getDirInfo(String volumeName, String bucketName,
+      String parentKey) throws Exception {
+    OMMetadataManager omMetadataManager =
+            cluster.getOzoneManager().getMetadataManager();
+    long bucketId = TestOMRequestUtils.getBucketId(volumeName, bucketName,
+            omMetadataManager);
+    String[] pathComponents = StringUtils.split(parentKey, '/');
+    long parentId = bucketId;
+    OmDirectoryInfo dirInfo = null;
+    for (int indx = 0; indx < pathComponents.length; indx++) {
+      String pathElement = pathComponents[indx];
+      String dbKey = omMetadataManager.getOzonePathKey(parentId,
+              pathElement);
+      dirInfo =
+              omMetadataManager.getDirectoryTable().get(dbKey);
+      parentId = dirInfo.getObjectID();
+    }
+    return dirInfo;
+  }
+
+  private void verifyKeyInFileTable(Table<String, OmKeyInfo> fileTable,
+      String fileName, long parentID, boolean isEmpty) throws IOException {
+    TableIterator<String, ? extends Table.KeyValue<String, OmKeyInfo>> iterator
+            = fileTable.iterator();
+
+    if (isEmpty) {
+      Assert.assertTrue("Table is not empty!", fileTable.isEmpty());
+    } else {
+      Assert.assertFalse("Table is empty!", fileTable.isEmpty());
+      while (iterator.hasNext()) {
+        Table.KeyValue<String, OmKeyInfo> next = iterator.next();
+        Assert.assertEquals("Invalid Key: " + next.getKey(),
+                parentID + "/" + fileName, next.getKey());
+        OmKeyInfo omKeyInfo = next.getValue();
+        Assert.assertEquals("Invalid Key", fileName,
+                omKeyInfo.getFileName());
+        Assert.assertEquals("Invalid Key", fileName,
+                omKeyInfo.getKeyName());
+        Assert.assertEquals("Invalid Key", parentID,
+                omKeyInfo.getParentObjectID());
+      }
+    }
+  }
+
+  private void verifyKeyInOpenFileTable(Table<String, OmKeyInfo> openFileTable,
+      String fileName, long parentID, boolean isEmpty) throws IOException {
+    TableIterator<String, ? extends Table.KeyValue<String, OmKeyInfo>> iterator
+            = openFileTable.iterator();
+
+    if (isEmpty) {
+      Assert.assertTrue("Table is not empty!", openFileTable.isEmpty());
+    } else {
+      Assert.assertFalse("Table is empty!", openFileTable.isEmpty());
+      while (iterator.hasNext()) {
+        Table.KeyValue<String, OmKeyInfo> next = iterator.next();
+        // used startsWith because the key format is,
+        // <parentID>/fileName/<clientID> and clientID is not visible.
+        Assert.assertTrue("Invalid Key: " + next.getKey(),
+                next.getKey().startsWith(parentID + "/" + fileName));
+        OmKeyInfo omKeyInfo = next.getValue();
+        Assert.assertEquals("Invalid Key", fileName,
+                omKeyInfo.getFileName());
+        Assert.assertEquals("Invalid Key", fileName,
+                omKeyInfo.getKeyName());
+        Assert.assertEquals("Invalid Key", parentID,
+                omKeyInfo.getParentObjectID());
+      }
+    }
+  }
+
+  /**
+   * Shutdown MiniDFSCluster.
+   */
+  @AfterClass
+  public static void shutdown() {
+    if (cluster != null) {
+      cluster.shutdown();
+    }
+  }
+}
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/utils/OzoneManagerRatisUtils.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/utils/OzoneManagerRatisUtils.java
index 31120f1..c469c2c 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/utils/OzoneManagerRatisUtils.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/utils/OzoneManagerRatisUtils.java
@@ -42,6 +42,7 @@ import org.apache.hadoop.ozone.om.request.key.OMAllocateBlockRequestV1;
 import org.apache.hadoop.ozone.om.request.key.OMKeyCommitRequest;
 import org.apache.hadoop.ozone.om.request.key.OMKeyCommitRequestV1;
 import org.apache.hadoop.ozone.om.request.key.OMKeyCreateRequest;
+import org.apache.hadoop.ozone.om.request.key.OMKeyCreateRequestV1;
 import org.apache.hadoop.ozone.om.request.key.OMKeyDeleteRequest;
 import org.apache.hadoop.ozone.om.request.key.OMKeyDeleteRequestV1;
 import org.apache.hadoop.ozone.om.request.key.OMKeyPurgeRequest;
@@ -144,6 +145,9 @@ public final class OzoneManagerRatisUtils {
       }
       return new OMAllocateBlockRequest(omRequest);
     case CreateKey:
+      if (omLayoutVersionV1) {
+        return new OMKeyCreateRequestV1(omRequest);
+      }
       return new OMKeyCreateRequest(omRequest);
     case CommitKey:
       if (omLayoutVersionV1) {
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/file/OMFileCreateRequestV1.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/file/OMFileCreateRequestV1.java
index e38908a..f35c9a5 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/file/OMFileCreateRequestV1.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/file/OMFileCreateRequestV1.java
@@ -217,7 +217,8 @@ public class OMFileCreateRequestV1 extends OMFileCreateRequest {
           .setOpenVersion(openVersion).build())
           .setCmdType(Type.CreateFile);
       omClientResponse = new OMFileCreateResponseV1(omResponse.build(),
-              omFileInfo, missingParentInfos, clientID, omBucketInfo.copyObject());
+              omFileInfo, missingParentInfos, clientID,
+              omBucketInfo.copyObject());
 
       result = Result.SUCCESS;
     } catch (IOException ex) {
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCommitRequestV1.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCommitRequestV1.java
index 1ea36cc..3c49610 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCommitRequestV1.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCommitRequestV1.java
@@ -25,7 +25,6 @@ import org.apache.hadoop.ozone.om.OMMetrics;
 import org.apache.hadoop.ozone.om.OzoneManager;
 import org.apache.hadoop.ozone.om.exceptions.OMException;
 import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
-import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
 import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
 import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
 import org.apache.hadoop.ozone.om.helpers.OzoneFSUtils;
@@ -91,7 +90,6 @@ public class OMKeyCommitRequestV1 extends OMKeyCommitRequest {
 
     IOException exception = null;
     OmKeyInfo omKeyInfo = null;
-    OmVolumeArgs omVolumeArgs = null;
     OmBucketInfo omBucketInfo = null;
     OMClientResponse omClientResponse = null;
     boolean bucketLockAcquired = false;
@@ -168,8 +166,7 @@ public class OMKeyCommitRequestV1 extends OMKeyCommitRequest {
       omBucketInfo.incrUsedBytes(correctedSpace);
 
       omClientResponse = new OMKeyCommitResponseV1(omResponse.build(),
-              omKeyInfo, dbFileKey, dbOpenFileKey, omVolumeArgs,
-              omBucketInfo.copyObject());
+              omKeyInfo, dbFileKey, dbOpenFileKey, omBucketInfo.copyObject());
 
       result = Result.SUCCESS;
     } catch (IOException ex) {
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCreateRequest.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCreateRequest.java
index 55f4990..68c0c36 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCreateRequest.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCreateRequest.java
@@ -349,24 +349,34 @@ public class OMKeyCreateRequest extends OMKeyRequest {
         OMAction.ALLOCATE_KEY, auditMap, exception,
         getOmRequest().getUserInfo()));
 
+    logResult(createKeyRequest, omMetrics, exception, result,
+            numMissingParents);
+
+    return omClientResponse;
+  }
+
+  protected void logResult(CreateKeyRequest createKeyRequest,
+      OMMetrics omMetrics, IOException exception, Result result,
+       int numMissingParents) {
     switch (result) {
     case SUCCESS:
       // Missing directories are created immediately, counting that here.
       // The metric for the key is incremented as part of the key commit.
       omMetrics.incNumKeys(numMissingParents);
-      LOG.debug("Key created. Volume:{}, Bucket:{}, Key:{}", volumeName,
-          bucketName, keyName);
+      LOG.debug("Key created. Volume:{}, Bucket:{}, Key:{}",
+              createKeyRequest.getKeyArgs().getVolumeName(),
+              createKeyRequest.getKeyArgs().getBucketName(),
+              createKeyRequest.getKeyArgs().getKeyName());
       break;
     case FAILURE:
       LOG.error("Key creation failed. Volume:{}, Bucket:{}, Key{}. " +
-          "Exception:{}", volumeName, bucketName, keyName, exception);
+          "Exception:{}", createKeyRequest.getKeyArgs().getVolumeName(),
+              createKeyRequest.getKeyArgs().getBucketName(),
+              createKeyRequest.getKeyArgs().getKeyName(), exception);
       break;
     default:
       LOG.error("Unrecognized Result for OMKeyCreateRequest: {}",
           createKeyRequest);
     }
-
-    return omClientResponse;
   }
-
 }
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/file/OMFileCreateRequestV1.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCreateRequestV1.java
similarity index 66%
copy from hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/file/OMFileCreateRequestV1.java
copy to hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCreateRequestV1.java
index e38908a..416e462 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/file/OMFileCreateRequestV1.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCreateRequestV1.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.hadoop.ozone.om.request.file;
+package org.apache.hadoop.ozone.om.request.key;
 
 import com.google.common.base.Optional;
 import org.apache.hadoop.ozone.audit.OMAction;
@@ -29,15 +29,15 @@ import org.apache.hadoop.ozone.om.helpers.OmDirectoryInfo;
 import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
 import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
 import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerDoubleBufferHelper;
+import org.apache.hadoop.ozone.om.request.file.OMDirectoryCreateRequestV1;
+import org.apache.hadoop.ozone.om.request.file.OMFileRequest;
 import org.apache.hadoop.ozone.om.request.util.OmResponseUtil;
 import org.apache.hadoop.ozone.om.response.OMClientResponse;
-import org.apache.hadoop.ozone.om.response.file.OMFileCreateResponse;
-import org.apache.hadoop.ozone.om.response.file.OMFileCreateResponseV1;
-import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.CreateFileRequest;
-import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.CreateFileResponse;
-import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.KeyArgs;
-import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse;
+import org.apache.hadoop.ozone.om.response.key.OMKeyCreateResponseV1;
+import org.apache.hadoop.ozone.om.response.key.OMKeyCreateResponse;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.CreateKeyResponse;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type;
 import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer;
 import org.apache.hadoop.ozone.security.acl.OzoneObj;
@@ -51,16 +51,19 @@ import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
 
+import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.NOT_A_FILE;
 import static org.apache.hadoop.ozone.om.lock.OzoneManagerLock.Resource.BUCKET_LOCK;
+import static org.apache.hadoop.ozone.om.request.file.OMFileRequest.OMDirectoryResult.DIRECTORY_EXISTS;
+import static org.apache.hadoop.ozone.om.request.file.OMFileRequest.OMDirectoryResult.FILE_EXISTS_IN_GIVENPATH;
 
 /**
- * Handles create file request layout version1.
+ * Handles CreateKey request layout version1.
  */
-public class OMFileCreateRequestV1 extends OMFileCreateRequest {
-
+public class OMKeyCreateRequestV1 extends OMKeyCreateRequest {
   private static final Logger LOG =
-      LoggerFactory.getLogger(OMFileCreateRequestV1.class);
-  public OMFileCreateRequestV1(OMRequest omRequest) {
+          LoggerFactory.getLogger(OMKeyCreateRequestV1.class);
+
+  public OMKeyCreateRequestV1(OMRequest omRequest) {
     super(omRequest);
   }
 
@@ -69,61 +72,42 @@ public class OMFileCreateRequestV1 extends OMFileCreateRequest {
   public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager,
       long trxnLogIndex, OzoneManagerDoubleBufferHelper omDoubleBufferHelper) {
 
-    CreateFileRequest createFileRequest = getOmRequest().getCreateFileRequest();
-    KeyArgs keyArgs = createFileRequest.getKeyArgs();
+    OzoneManagerProtocolProtos.CreateKeyRequest createKeyRequest =
+            getOmRequest().getCreateKeyRequest();
+
+    OzoneManagerProtocolProtos.KeyArgs keyArgs = createKeyRequest.getKeyArgs();
     Map<String, String> auditMap = buildKeyArgsAuditMap(keyArgs);
 
     String volumeName = keyArgs.getVolumeName();
     String bucketName = keyArgs.getBucketName();
     String keyName = keyArgs.getKeyName();
 
-    // if isRecursive is true, file would be created even if parent
-    // directories does not exist.
-    boolean isRecursive = createFileRequest.getIsRecursive();
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("File create for : " + volumeName + "/" + bucketName + "/"
-          + keyName + ":" + isRecursive);
-    }
-
-    // if isOverWrite is true, file would be over written.
-    boolean isOverWrite = createFileRequest.getIsOverwrite();
-
     OMMetrics omMetrics = ozoneManager.getMetrics();
-    omMetrics.incNumCreateFile();
+    omMetrics.incNumKeyAllocates();
 
     OMMetadataManager omMetadataManager = ozoneManager.getMetadataManager();
-
-    boolean acquiredLock = false;
-
     OmBucketInfo omBucketInfo = null;
     final List<OmKeyLocationInfo> locations = new ArrayList<>();
-    List<OmDirectoryInfo> missingParentInfos;
-    int numKeysCreated = 0;
 
+    boolean acquireLock = false;
     OMClientResponse omClientResponse = null;
-    OMResponse.Builder omResponse = OmResponseUtil.getOMResponseBuilder(
-        getOmRequest());
+    OzoneManagerProtocolProtos.OMResponse.Builder omResponse =
+            OmResponseUtil.getOMResponseBuilder(getOmRequest());
     IOException exception = null;
-    Result result = null;
+    Result result;
+    List<OmDirectoryInfo> missingParentInfos;
+    int numKeysCreated = 0;
     try {
       keyArgs = resolveBucketLink(ozoneManager, keyArgs, auditMap);
       volumeName = keyArgs.getVolumeName();
       bucketName = keyArgs.getBucketName();
 
-      if (keyName.length() == 0) {
-        // Check if this is the root of the filesystem.
-        throw new OMException("Can not write to directory: " + keyName,
-                OMException.ResultCodes.NOT_A_FILE);
-      }
-
       // check Acl
       checkKeyAcls(ozoneManager, volumeName, bucketName, keyName,
-          IAccessAuthorizer.ACLType.CREATE, OzoneObj.ResourceType.KEY);
-
-      // acquire lock
-      acquiredLock = omMetadataManager.getLock().acquireWriteLock(BUCKET_LOCK,
-          volumeName, bucketName);
+              IAccessAuthorizer.ACLType.CREATE, OzoneObj.ResourceType.KEY);
 
+      acquireLock = omMetadataManager.getLock().acquireWriteLock(BUCKET_LOCK,
+              volumeName, bucketName);
       validateBucketAndVolume(omMetadataManager, volumeName, bucketName);
 
       OmKeyInfo dbFileInfo = null;
@@ -144,12 +128,14 @@ public class OMFileCreateRequestV1 extends OMFileCreateRequest {
         }
       }
 
-      // check if the file or directory already existed in OM
-      checkDirectoryResult(keyName, isOverWrite,
-              pathInfoV1.getDirectoryResult());
-
-      if (!isRecursive) {
-        checkAllParentsExist(keyArgs, pathInfoV1);
+      // Check if a file or directory exists with same key name.
+      if (pathInfoV1.getDirectoryResult() == DIRECTORY_EXISTS) {
+        throw new OMException("Cannot write to " +
+                "directory. createIntermediateDirs behavior is enabled and " +
+                "hence / has special interpretation: " + keyName, NOT_A_FILE);
+      } else if (pathInfoV1.getDirectoryResult() == FILE_EXISTS_IN_GIVENPATH) {
+        throw new OMException("Can not create file: " + keyName +
+                " as there is already file in the given path", NOT_A_FILE);
       }
 
       // add all missing parents to dir table
@@ -162,7 +148,7 @@ public class OMFileCreateRequestV1 extends OMFileCreateRequest {
 
       // do open key
       OmBucketInfo bucketInfo = omMetadataManager.getBucketTable().get(
-          omMetadataManager.getBucketKey(volumeName, bucketName));
+              omMetadataManager.getBucketKey(volumeName, bucketName));
 
       OmKeyInfo omFileInfo = prepareFileInfo(omMetadataManager, keyArgs,
               dbFileInfo, keyArgs.getDataSize(), locations,
@@ -172,15 +158,15 @@ public class OMFileCreateRequestV1 extends OMFileCreateRequest {
               ozoneManager.isRatisEnabled());
 
       long openVersion = omFileInfo.getLatestVersionLocations().getVersion();
-      long clientID = createFileRequest.getClientID();
+      long clientID = createKeyRequest.getClientID();
       String dbOpenFileName = omMetadataManager.getOpenFileName(
               pathInfoV1.getLastKnownParentId(), pathInfoV1.getLeafNodeName(),
               clientID);
 
       // Append new blocks
       List<OmKeyLocationInfo> newLocationList = keyArgs.getKeyLocationsList()
-          .stream().map(OmKeyLocationInfo::getFromProtobuf)
-          .collect(Collectors.toList());
+              .stream().map(OmKeyLocationInfo::getFromProtobuf)
+              .collect(Collectors.toList());
       omFileInfo.appendNewBlocks(newLocationList, false);
 
       omBucketInfo = getBucketInfo(omMetadataManager, volumeName, bucketName);
@@ -210,51 +196,39 @@ public class OMFileCreateRequestV1 extends OMFileCreateRequest {
 
       // Prepare response. Sets user given full key name in the 'keyName'
       // attribute in response object.
-      int clientVersion = getOmRequest().getVersion();
-      omResponse.setCreateFileResponse(CreateFileResponse.newBuilder()
-          .setKeyInfo(omFileInfo.getProtobuf(keyName, clientVersion))
-          .setID(clientID)
-          .setOpenVersion(openVersion).build())
-          .setCmdType(Type.CreateFile);
-      omClientResponse = new OMFileCreateResponseV1(omResponse.build(),
-              omFileInfo, missingParentInfos, clientID, omBucketInfo.copyObject());
+      omResponse.setCreateKeyResponse(CreateKeyResponse.newBuilder()
+              .setKeyInfo(omFileInfo.getProtobuf(keyName))
+              .setID(clientID)
+              .setOpenVersion(openVersion).build())
+              .setCmdType(Type.CreateKey);
+      omClientResponse = new OMKeyCreateResponseV1(omResponse.build(),
+              omFileInfo, missingParentInfos, clientID,
+              omBucketInfo.copyObject());
 
       result = Result.SUCCESS;
     } catch (IOException ex) {
       result = Result.FAILURE;
       exception = ex;
-      omMetrics.incNumCreateFileFails();
-      omResponse.setCmdType(Type.CreateFile);
-      omClientResponse = new OMFileCreateResponse(createErrorOMResponse(
-            omResponse, exception));
+      omMetrics.incNumKeyAllocateFails();
+      omResponse.setCmdType(Type.CreateKey);
+      omClientResponse = new OMKeyCreateResponse(
+              createErrorOMResponse(omResponse, exception));
     } finally {
       addResponseToDoubleBuffer(trxnLogIndex, omClientResponse,
-          omDoubleBufferHelper);
-      if (acquiredLock) {
+              omDoubleBufferHelper);
+      if (acquireLock) {
         omMetadataManager.getLock().releaseWriteLock(BUCKET_LOCK, volumeName,
-            bucketName);
+                bucketName);
       }
     }
 
     // Audit Log outside the lock
     auditLog(ozoneManager.getAuditLogger(), buildAuditMessage(
-        OMAction.CREATE_FILE, auditMap, exception,
-        getOmRequest().getUserInfo()));
-
-    switch (result) {
-    case SUCCESS:
-      omMetrics.incNumKeys(numKeysCreated);
-      LOG.debug("File created. Volume:{}, Bucket:{}, Key:{}", volumeName,
-          bucketName, keyName);
-      break;
-    case FAILURE:
-      LOG.error("File create failed. Volume:{}, Bucket:{}, Key{}.",
-          volumeName, bucketName, keyName, exception);
-      break;
-    default:
-      LOG.error("Unrecognized Result for OMFileCreateRequest: {}",
-          createFileRequest);
-    }
+            OMAction.ALLOCATE_KEY, auditMap, exception,
+            getOmRequest().getUserInfo()));
+
+    logResult(createKeyRequest, omMetrics, exception, result,
+            numKeysCreated);
 
     return omClientResponse;
   }
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/file/OMFileCreateResponseV1.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/file/OMFileCreateResponseV1.java
index ccaaa6b..7325def 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/file/OMFileCreateResponseV1.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/file/OMFileCreateResponseV1.java
@@ -32,12 +32,13 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 
+import static org.apache.hadoop.ozone.om.OmMetadataManagerImpl.DIRECTORY_TABLE;
 import static org.apache.hadoop.ozone.om.OmMetadataManagerImpl.OPEN_FILE_TABLE;
 
 /**
  * Response for create file request layout version V1.
  */
-@CleanupTableInfo(cleanupTables = OPEN_FILE_TABLE)
+@CleanupTableInfo(cleanupTables = {DIRECTORY_TABLE, OPEN_FILE_TABLE})
 public class OMFileCreateResponseV1 extends OMFileCreateResponse {
 
   private List<OmDirectoryInfo> parentDirInfos;
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/OMAllocateBlockResponseV1.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/OMAllocateBlockResponseV1.java
index ef8b639..138cca1 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/OMAllocateBlockResponseV1.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/OMAllocateBlockResponseV1.java
@@ -22,7 +22,6 @@ import org.apache.hadoop.hdds.utils.db.BatchOperation;
 import org.apache.hadoop.ozone.om.OMMetadataManager;
 import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
 import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
-import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
 import org.apache.hadoop.ozone.om.request.file.OMFileRequest;
 import org.apache.hadoop.ozone.om.response.CleanupTableInfo;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse;
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/OMKeyCommitResponseV1.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/OMKeyCommitResponseV1.java
index c0840e3..5f0a337 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/OMKeyCommitResponseV1.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/OMKeyCommitResponseV1.java
@@ -22,7 +22,6 @@ import org.apache.hadoop.hdds.utils.db.BatchOperation;
 import org.apache.hadoop.ozone.om.OMMetadataManager;
 import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
 import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
-import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
 import org.apache.hadoop.ozone.om.request.file.OMFileRequest;
 import org.apache.hadoop.ozone.om.response.CleanupTableInfo;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse;
@@ -43,7 +42,6 @@ public class OMKeyCommitResponseV1 extends OMKeyCommitResponse {
   public OMKeyCommitResponseV1(@Nonnull OMResponse omResponse,
                                @Nonnull OmKeyInfo omKeyInfo,
                                String ozoneKeyName, String openKeyName,
-                               @Nonnull OmVolumeArgs omVolumeArgs,
                                @Nonnull OmBucketInfo omBucketInfo) {
     super(omResponse, omKeyInfo, ozoneKeyName, openKeyName,
             omBucketInfo);
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/OMAllocateBlockResponseV1.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/OMKeyCreateResponseV1.java
similarity index 51%
copy from hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/OMAllocateBlockResponseV1.java
copy to hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/OMKeyCreateResponseV1.java
index ef8b639..59c7edf 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/OMAllocateBlockResponseV1.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/OMKeyCreateResponseV1.java
@@ -18,43 +18,31 @@
 
 package org.apache.hadoop.ozone.om.response.key;
 
-import org.apache.hadoop.hdds.utils.db.BatchOperation;
-import org.apache.hadoop.ozone.om.OMMetadataManager;
 import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
+import org.apache.hadoop.ozone.om.helpers.OmDirectoryInfo;
 import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
-import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
-import org.apache.hadoop.ozone.om.request.file.OMFileRequest;
 import org.apache.hadoop.ozone.om.response.CleanupTableInfo;
+import org.apache.hadoop.ozone.om.response.file.OMFileCreateResponseV1;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse;
 
 import javax.annotation.Nonnull;
-import java.io.IOException;
+import java.util.List;
 
+import static org.apache.hadoop.ozone.om.OmMetadataManagerImpl.DIRECTORY_TABLE;
 import static org.apache.hadoop.ozone.om.OmMetadataManagerImpl.OPEN_FILE_TABLE;
 
 /**
- * Response for AllocateBlock request layout version V1.
+ * Response for CreateKey request layout version V1.
  */
-@CleanupTableInfo(cleanupTables = {OPEN_FILE_TABLE})
-public class OMAllocateBlockResponseV1 extends OMAllocateBlockResponse {
-
-  public OMAllocateBlockResponseV1(@Nonnull OMResponse omResponse,
-      @Nonnull OmKeyInfo omKeyInfo, long clientID,
-      @Nonnull OmBucketInfo omBucketInfo) {
-    super(omResponse, omKeyInfo, clientID, omBucketInfo);
-  }
-
-  @Override
-  public void addToDBBatch(OMMetadataManager omMetadataManager,
-      BatchOperation batchOperation) throws IOException {
-
-    OMFileRequest.addToOpenFileTable(omMetadataManager, batchOperation,
-            getOmKeyInfo(), getClientID());
-
-    // update bucket usedBytes.
-    omMetadataManager.getBucketTable().putWithBatch(batchOperation,
-            omMetadataManager.getBucketKey(getOmKeyInfo().getVolumeName(),
-                    getOmKeyInfo().getBucketName()), getOmBucketInfo());
+@CleanupTableInfo(cleanupTables = {DIRECTORY_TABLE, OPEN_FILE_TABLE})
+public class OMKeyCreateResponseV1 extends OMFileCreateResponseV1 {
+
+  public OMKeyCreateResponseV1(@Nonnull OMResponse omResponse,
+                               @Nonnull OmKeyInfo omKeyInfo,
+                               @Nonnull List<OmDirectoryInfo> parentDirInfos,
+                               long openKeySessionID,
+                               @Nonnull OmBucketInfo omBucketInfo) {
+    super(omResponse, omKeyInfo, parentDirInfos, openKeySessionID,
+            omBucketInfo);
   }
 }
-
diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyCreateRequest.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyCreateRequest.java
index 7269957..3df6f38 100644
--- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyCreateRequest.java
+++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyCreateRequest.java
@@ -74,7 +74,7 @@ public class TestOMKeyCreateRequest extends TestOMKeyRequest {
         doPreExecute(createKeyRequest(false, 0));
 
     OMKeyCreateRequest omKeyCreateRequest =
-        new OMKeyCreateRequest(modifiedOmRequest);
+            getOMKeyCreateRequest(modifiedOmRequest);
 
     // Add volume and bucket entries to DB.
     addVolumeAndBucketToDB(volumeName, bucketName,
@@ -82,8 +82,7 @@ public class TestOMKeyCreateRequest extends TestOMKeyRequest {
 
     long id = modifiedOmRequest.getCreateKeyRequest().getClientID();
 
-    String openKey = omMetadataManager.getOpenKey(volumeName, bucketName,
-        keyName, id);
+    String openKey = getOpenKey(id);
 
     // Before calling
     OmKeyInfo omKeyInfo = omMetadataManager.getOpenKeyTable().get(openKey);
@@ -138,7 +137,7 @@ public class TestOMKeyCreateRequest extends TestOMKeyRequest {
         doPreExecute(createKeyRequest(true, partNumber));
 
     OMKeyCreateRequest omKeyCreateRequest =
-        new OMKeyCreateRequest(modifiedOmRequest);
+            getOMKeyCreateRequest(modifiedOmRequest);
 
     // Add volume and bucket entries to DB.
     addVolumeAndBucketToDB(volumeName, bucketName,
@@ -178,7 +177,7 @@ public class TestOMKeyCreateRequest extends TestOMKeyRequest {
         doPreExecute(createKeyRequest(false, 0));
 
     OMKeyCreateRequest omKeyCreateRequest =
-        new OMKeyCreateRequest(modifiedOmRequest);
+            getOMKeyCreateRequest(modifiedOmRequest);
 
 
     long id = modifiedOmRequest.getCreateKeyRequest().getClientID();
@@ -217,13 +216,12 @@ public class TestOMKeyCreateRequest extends TestOMKeyRequest {
             false, 0));
 
     OMKeyCreateRequest omKeyCreateRequest =
-        new OMKeyCreateRequest(modifiedOmRequest);
+            getOMKeyCreateRequest(modifiedOmRequest);
 
 
     long id = modifiedOmRequest.getCreateKeyRequest().getClientID();
 
-    String openKey = omMetadataManager.getOpenKey(volumeName, bucketName,
-        keyName, id);
+    String openKey = getOpenKey(id);
 
     TestOMRequestUtils.addVolumeToDB(volumeName, OzoneConsts.OZONE,
         omMetadataManager);
@@ -248,8 +246,6 @@ public class TestOMKeyCreateRequest extends TestOMKeyRequest {
 
   }
 
-
-
   /**
    * This method calls preExecute and verify the modified request.
    * @param originalOMRequest
@@ -259,7 +255,7 @@ public class TestOMKeyCreateRequest extends TestOMKeyRequest {
   private OMRequest doPreExecute(OMRequest originalOMRequest) throws Exception {
 
     OMKeyCreateRequest omKeyCreateRequest =
-        new OMKeyCreateRequest(originalOMRequest);
+            getOMKeyCreateRequest(originalOMRequest);
 
     OMRequest modifiedOmRequest =
         omKeyCreateRequest.preExecute(ozoneManager);
@@ -349,7 +345,7 @@ public class TestOMKeyCreateRequest extends TestOMKeyRequest {
   @Test
   public void testKeyCreateWithFileSystemPathsEnabled() throws Exception {
 
-    OzoneConfiguration configuration = new OzoneConfiguration();
+    OzoneConfiguration configuration = getOzoneConfiguration();
     configuration.setBoolean(OZONE_OM_ENABLE_FILESYSTEM_PATHS, true);
     when(ozoneManager.getConfiguration()).thenReturn(configuration);
     when(ozoneManager.getEnableFileSystemPaths()).thenReturn(true);
@@ -367,8 +363,7 @@ public class TestOMKeyCreateRequest extends TestOMKeyRequest {
     createAndCheck(keyName);
 
     // Commit openKey entry.
-    TestOMRequestUtils.addKeyToTable(false, volumeName, bucketName,
-        keyName.substring(1), 0L, RATIS, THREE, omMetadataManager);
+    addToKeyTable(keyName);
 
     // Now create another file in same dir path.
     keyName = "/a/b/c/file2";
@@ -430,10 +425,15 @@ public class TestOMKeyCreateRequest extends TestOMKeyRequest {
 
   }
 
+  protected void addToKeyTable(String keyName) throws Exception {
+    TestOMRequestUtils.addKeyToTable(false, volumeName, bucketName,
+        keyName.substring(1), 0L, RATIS, THREE, omMetadataManager);
+  }
+
 
   private void checkNotAValidPath(String keyName) {
     OMRequest omRequest = createKeyRequest(false, 0, keyName);
-    OMKeyCreateRequest omKeyCreateRequest = new OMKeyCreateRequest(omRequest);
+    OMKeyCreateRequest omKeyCreateRequest = getOMKeyCreateRequest(omRequest);
 
     try {
       omKeyCreateRequest.preExecute(ozoneManager);
@@ -450,11 +450,11 @@ public class TestOMKeyCreateRequest extends TestOMKeyRequest {
   private void checkNotAFile(String keyName) throws Exception {
     OMRequest omRequest = createKeyRequest(false, 0, keyName);
 
-    OMKeyCreateRequest omKeyCreateRequest = new OMKeyCreateRequest(omRequest);
+    OMKeyCreateRequest omKeyCreateRequest = getOMKeyCreateRequest(omRequest);
 
     omRequest = omKeyCreateRequest.preExecute(ozoneManager);
 
-    omKeyCreateRequest = new OMKeyCreateRequest(omRequest);
+    omKeyCreateRequest = getOMKeyCreateRequest(omRequest);
 
     OMClientResponse omClientResponse =
         omKeyCreateRequest.validateAndUpdateCache(ozoneManager,
@@ -468,11 +468,11 @@ public class TestOMKeyCreateRequest extends TestOMKeyRequest {
   private void createAndCheck(String keyName) throws Exception {
     OMRequest omRequest = createKeyRequest(false, 0, keyName);
 
-    OMKeyCreateRequest omKeyCreateRequest = new OMKeyCreateRequest(omRequest);
+    OMKeyCreateRequest omKeyCreateRequest = getOMKeyCreateRequest(omRequest);
 
     omRequest = omKeyCreateRequest.preExecute(ozoneManager);
 
-    omKeyCreateRequest = new OMKeyCreateRequest(omRequest);
+    omKeyCreateRequest = getOMKeyCreateRequest(omRequest);
 
     OMClientResponse omClientResponse =
         omKeyCreateRequest.validateAndUpdateCache(ozoneManager,
@@ -483,7 +483,7 @@ public class TestOMKeyCreateRequest extends TestOMKeyRequest {
     checkCreatedPaths(omKeyCreateRequest, omRequest, keyName);
   }
 
-  private void checkCreatedPaths(OMKeyCreateRequest omKeyCreateRequest,
+  protected void checkCreatedPaths(OMKeyCreateRequest omKeyCreateRequest,
       OMRequest omRequest, String keyName) throws Exception {
     keyName = omKeyCreateRequest.validateAndNormalizeKey(true, keyName);
     // Check intermediate directories created or not.
@@ -497,9 +497,7 @@ public class TestOMKeyCreateRequest extends TestOMKeyRequest {
     Assert.assertNotNull(omKeyInfo);
   }
 
-
-
-  private void checkIntermediatePaths(Path keyPath) throws Exception {
+  protected long checkIntermediatePaths(Path keyPath) throws Exception {
     // Check intermediate paths are created
     keyPath = keyPath.getParent();
     while(keyPath != null) {
@@ -508,6 +506,15 @@ public class TestOMKeyCreateRequest extends TestOMKeyRequest {
               keyPath.toString())));
       keyPath = keyPath.getParent();
     }
+    return -1;
+  }
+
+  protected String getOpenKey(long id) throws IOException {
+    return omMetadataManager.getOpenKey(volumeName, bucketName,
+            keyName, id);
   }
 
+  protected OMKeyCreateRequest getOMKeyCreateRequest(OMRequest omRequest) {
+    return new OMKeyCreateRequest(omRequest);
+  }
 }
diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyCreateRequestV1.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyCreateRequestV1.java
new file mode 100644
index 0000000..83c640d
--- /dev/null
+++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyCreateRequestV1.java
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.om.request.key;
+
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.om.OMConfigKeys;
+import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
+import org.apache.hadoop.ozone.om.helpers.OmDirectoryInfo;
+import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
+import org.apache.hadoop.ozone.om.helpers.OzoneFSUtils;
+import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerRatisUtils;
+import org.apache.hadoop.ozone.om.request.TestOMRequestUtils;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
+import org.apache.hadoop.util.Time;
+import org.jetbrains.annotations.NotNull;
+import org.junit.Assert;
+
+import java.io.IOException;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.Iterator;
+
+/**
+ * Tests OMCreateKeyRequestV1 class.
+ */
+public class TestOMKeyCreateRequestV1 extends TestOMKeyCreateRequest {
+
+  @NotNull
+  @Override
+  protected OzoneConfiguration getOzoneConfiguration() {
+    OzoneConfiguration config = super.getOzoneConfiguration();
+    config.set(OMConfigKeys.OZONE_OM_LAYOUT_VERSION, "V1");
+    // omLayoutVersionV1 flag will be set while invoking OzoneManager#start()
+    // and its not invoked in this test. Hence it is explicitly setting
+    // this configuration to populate prefix tables.
+    OzoneManagerRatisUtils.setOmLayoutVersionV1(true);
+    return config;
+  }
+
+  protected void addToKeyTable(String keyName) throws Exception {
+    Path keyPath = Paths.get(keyName);
+    long parentId = checkIntermediatePaths(keyPath);
+    String fileName = OzoneFSUtils.getFileName(keyName);
+    OmKeyInfo omKeyInfo =
+            TestOMRequestUtils.createOmKeyInfo(volumeName, bucketName, fileName,
+                    HddsProtos.ReplicationType.RATIS,
+                    HddsProtos.ReplicationFactor.ONE,
+                    parentId + 1,
+                    parentId, 100, Time.now());
+    TestOMRequestUtils.addFileToKeyTable(false, false,
+            fileName, omKeyInfo, -1, 50, omMetadataManager);
+  }
+
+  protected void checkCreatedPaths(OMKeyCreateRequest omKeyCreateRequest,
+      OMRequest omRequest, String keyName) throws Exception {
+    keyName = omKeyCreateRequest.validateAndNormalizeKey(true, keyName);
+    // Check intermediate directories created or not.
+    Path keyPath = Paths.get(keyName);
+    long parentID = checkIntermediatePaths(keyPath);
+
+    // Check open key entry
+    String fileName = keyPath.getFileName().toString();
+    String openKey = omMetadataManager.getOpenFileName(parentID, fileName,
+            omRequest.getCreateKeyRequest().getClientID());
+    OmKeyInfo omKeyInfo = omMetadataManager.getOpenKeyTable().get(openKey);
+    Assert.assertNotNull(omKeyInfo);
+  }
+
+  protected long checkIntermediatePaths(Path keyPath) throws Exception {
+    // Check intermediate paths are created
+    keyPath = keyPath.getParent(); // skip the file name
+    String bucketKey = omMetadataManager.getBucketKey(volumeName, bucketName);
+    OmBucketInfo omBucketInfo =
+            omMetadataManager.getBucketTable().get(bucketKey);
+    long lastKnownParentId = omBucketInfo.getObjectID();
+
+    Iterator<Path> elements = keyPath.iterator();
+    StringBuilder fullKeyPath = new StringBuilder(bucketKey);
+    while (elements.hasNext()) {
+      String fileName = elements.next().toString();
+      fullKeyPath.append(OzoneConsts.OM_KEY_PREFIX);
+      fullKeyPath.append(fileName);
+      String dbNodeName = omMetadataManager.getOzonePathKey(
+              lastKnownParentId, fileName);
+      OmDirectoryInfo omDirInfo = omMetadataManager.getDirectoryTable().
+              get(dbNodeName);
+
+      Assert.assertNotNull("Parent key path:" + fullKeyPath +
+              " doesn't exist", omDirInfo);
+      lastKnownParentId = omDirInfo.getObjectID();
+    }
+
+    return lastKnownParentId;
+  }
+
+  protected String getOpenKey(long id) throws IOException {
+    String bucketKey = omMetadataManager.getBucketKey(volumeName, bucketName);
+    OmBucketInfo omBucketInfo =
+            omMetadataManager.getBucketTable().get(bucketKey);
+    if (omBucketInfo != null) {
+      return omMetadataManager.getOpenFileName(omBucketInfo.getObjectID(),
+              keyName, id);
+    } else {
+      return omMetadataManager.getOpenFileName(1000, keyName, id);
+    }
+  }
+
+  protected OMKeyCreateRequest getOMKeyCreateRequest(OMRequest omRequest) {
+    return new OMKeyCreateRequestV1(omRequest);
+  }
+}
diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/key/TestOMAllocateBlockResponseV1.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/key/TestOMAllocateBlockResponseV1.java
index e105a37..92b3efe 100644
--- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/key/TestOMAllocateBlockResponseV1.java
+++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/key/TestOMAllocateBlockResponseV1.java
@@ -24,7 +24,6 @@ import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.ozone.om.OMConfigKeys;
 import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
 import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
-import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
 import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerRatisUtils;
 import org.apache.hadoop.ozone.om.request.TestOMRequestUtils;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse;
@@ -64,8 +63,8 @@ public class TestOMAllocateBlockResponseV1
 
   @NotNull
   protected OMAllocateBlockResponse getOmAllocateBlockResponse(
-          OmKeyInfo omKeyInfo, OmVolumeArgs omVolumeArgs,
-          OmBucketInfo omBucketInfo, OMResponse omResponse) {
+          OmKeyInfo omKeyInfo, OmBucketInfo omBucketInfo,
+          OMResponse omResponse) {
     return new OMAllocateBlockResponseV1(omResponse, omKeyInfo, clientID,
             omBucketInfo);
   }
diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/key/TestOMKeyCommitResponseV1.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/key/TestOMKeyCommitResponseV1.java
index 1e59ce8..4d68a4b 100644
--- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/key/TestOMKeyCommitResponseV1.java
+++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/key/TestOMKeyCommitResponseV1.java
@@ -42,8 +42,7 @@ public class TestOMKeyCommitResponseV1 extends TestOMKeyCommitResponse {
           OzoneManagerProtocolProtos.OMResponse omResponse, String openKey,
           String ozoneKey) {
     Assert.assertNotNull(omBucketInfo);
-    return new OMKeyCommitResponseV1(
-            omResponse, omKeyInfo, ozoneKey, openKey, omVolumeArgs,
+    return new OMKeyCommitResponseV1(omResponse, omKeyInfo, ozoneKey, openKey,
             omBucketInfo);
   }
 
diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/key/TestOMKeyCommitResponseV1.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/key/TestOMKeyCreateResponseV1.java
similarity index 60%
copy from hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/key/TestOMKeyCommitResponseV1.java
copy to hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/key/TestOMKeyCreateResponseV1.java
index 1e59ce8..e51a06b 100644
--- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/key/TestOMKeyCommitResponseV1.java
+++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/key/TestOMKeyCreateResponseV1.java
@@ -19,61 +19,31 @@
 package org.apache.hadoop.ozone.om.response.key;
 
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.ozone.om.OMConfigKeys;
+import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
 import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
-import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
-import org.apache.hadoop.ozone.om.helpers.OzoneFSUtils;
 import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerRatisUtils;
 import org.apache.hadoop.ozone.om.request.TestOMRequestUtils;
-import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse;
 import org.apache.hadoop.util.Time;
 import org.jetbrains.annotations.NotNull;
 import org.junit.Assert;
 
 /**
- * Tests OMKeyCommitResponse layout version V1.
+ * Tests OMKeyCreateResponseV1.
  */
-public class TestOMKeyCommitResponseV1 extends TestOMKeyCommitResponse {
-
-  @NotNull
-  protected OMKeyCommitResponse getOmKeyCommitResponse(
-          OmVolumeArgs omVolumeArgs, OmKeyInfo omKeyInfo,
-          OzoneManagerProtocolProtos.OMResponse omResponse, String openKey,
-          String ozoneKey) {
-    Assert.assertNotNull(omBucketInfo);
-    return new OMKeyCommitResponseV1(
-            omResponse, omKeyInfo, ozoneKey, openKey, omVolumeArgs,
-            omBucketInfo);
-  }
+public class TestOMKeyCreateResponseV1 extends TestOMKeyCreateResponse {
 
   @NotNull
   @Override
-  protected OmKeyInfo getOmKeyInfo() {
-    Assert.assertNotNull(omBucketInfo);
-    return TestOMRequestUtils.createOmKeyInfo(volumeName,
-            omBucketInfo.getBucketName(), keyName, replicationType,
-            replicationFactor,
-            omBucketInfo.getObjectID() + 1,
-            omBucketInfo.getObjectID(), 100, Time.now());
-  }
-
-  @NotNull
-  @Override
-  protected void addKeyToOpenKeyTable() throws Exception {
-    Assert.assertNotNull(omBucketInfo);
-    long parentID = omBucketInfo.getObjectID();
-    long objectId = parentID + 10;
-
-    OmKeyInfo omKeyInfoV1 =
-            TestOMRequestUtils.createOmKeyInfo(volumeName, bucketName, keyName,
-                    HddsProtos.ReplicationType.RATIS,
-                    HddsProtos.ReplicationFactor.ONE, objectId, parentID, 100,
-                    Time.now());
-
-    String fileName = OzoneFSUtils.getFileName(keyName);
-    TestOMRequestUtils.addFileToKeyTable(true, false,
-            fileName, omKeyInfoV1, clientID, txnLogId, omMetadataManager);
+  protected OzoneConfiguration getOzoneConfiguration() {
+    OzoneConfiguration config = super.getOzoneConfiguration();
+    config.set(OMConfigKeys.OZONE_OM_LAYOUT_VERSION, "V1");
+    // omLayoutVersionV1 flag will be set while invoking OzoneManager#start()
+    // and its not invoked in this test. Hence it is explicitly setting
+    // this configuration to populate prefix tables.
+    OzoneManagerRatisUtils.setOmLayoutVersionV1(true);
+    return config;
   }
 
   @NotNull
@@ -86,21 +56,20 @@ public class TestOMKeyCommitResponseV1 extends TestOMKeyCommitResponse {
 
   @NotNull
   @Override
-  protected String getOzoneKey() {
+  protected OmKeyInfo getOmKeyInfo() {
     Assert.assertNotNull(omBucketInfo);
-    return omMetadataManager.getOzonePathKey(omBucketInfo.getObjectID(),
-            keyName);
+    return TestOMRequestUtils.createOmKeyInfo(volumeName,
+            omBucketInfo.getBucketName(), keyName, replicationType,
+            replicationFactor,
+            omBucketInfo.getObjectID() + 1,
+            omBucketInfo.getObjectID(), 100, Time.now());
   }
 
   @NotNull
-  @Override
-  protected OzoneConfiguration getOzoneConfiguration() {
-    OzoneConfiguration config = super.getOzoneConfiguration();
-    config.set(OMConfigKeys.OZONE_OM_LAYOUT_VERSION, "V1");
-    // omLayoutVersionV1 flag will be set while invoking OzoneManager#start()
-    // and its not invoked in this test. Hence it is explicitly setting
-    // this configuration to populate prefix tables.
-    OzoneManagerRatisUtils.setOmLayoutVersionV1(true);
-    return config;
+  protected OMKeyCreateResponse getOmKeyCreateResponse(OmKeyInfo keyInfo,
+      OmBucketInfo bucketInfo, OMResponse response) {
+
+    return new OMKeyCreateResponseV1(response, keyInfo, null, clientID,
+            bucketInfo);
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org