You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by ra...@apache.org on 2021/02/05 05:46:35 UTC

[ozone] branch HDDS-2939 updated: HDDS-4771. [FSO]S3MultiPart: Implement InitiateMultiPartUpload (#1877)

This is an automated email from the ASF dual-hosted git repository.

rakeshr pushed a commit to branch HDDS-2939
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/HDDS-2939 by this push:
     new bc6d651  HDDS-4771. [FSO]S3MultiPart: Implement InitiateMultiPartUpload (#1877)
bc6d651 is described below

commit bc6d65198c35cf9fe46dc0a8426e1b30834c7a43
Author: Rakesh Radhakrishnan <ra...@apache.org>
AuthorDate: Fri Feb 5 11:16:15 2021 +0530

    HDDS-4771. [FSO]S3MultiPart: Implement InitiateMultiPartUpload (#1877)
---
 .../apache/hadoop/ozone/om/helpers/OmKeyInfo.java  |  32 ++-
 .../ozone/om/helpers/OmMultipartKeyInfo.java       |  63 ++++-
 .../rpc/TestOzoneClientMultipartUploadV1.java      | 182 ++++++++++++++
 .../src/main/proto/OmClientProtocol.proto          |   1 +
 .../apache/hadoop/ozone/om/OMMetadataManager.java  |  10 +
 .../hadoop/ozone/om/OmMetadataManagerImpl.java     |  26 +-
 .../hadoop/ozone/om/codec/OMDBDefinition.java      |  11 +-
 .../om/ratis/utils/OzoneManagerRatisUtils.java     |   4 +
 .../ozone/om/request/file/OMFileRequest.java       |  26 ++
 .../S3InitiateMultipartUploadRequestV1.java        | 271 +++++++++++++++++++++
 .../S3MultipartUploadCompleteRequest.java          |   5 +-
 .../S3InitiateMultipartUploadResponseV1.java       |  80 ++++++
 .../TestS3InitiateMultipartUploadRequestV1.java    | 186 ++++++++++++++
 .../s3/multipart/TestS3MultipartRequest.java       |  29 +++
 .../TestS3InitiateMultipartUploadResponse.java     |   2 +-
 .../TestS3InitiateMultipartUploadResponseV1.java   |  86 +++++++
 .../s3/multipart/TestS3MultipartResponse.java      |  46 ++++
 17 files changed, 1048 insertions(+), 12 deletions(-)

diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyInfo.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyInfo.java
index 80151d7..93036c9 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyInfo.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyInfo.java
@@ -54,8 +54,36 @@ public final class OmKeyInfo extends WithObjectID {
   private HddsProtos.ReplicationType type;
   private HddsProtos.ReplicationFactor factor;
   private FileEncryptionInfo encInfo;
-  private String fileName; // leaf node name
-  private long parentObjectID; // pointer to parent directory
+
+  /**
+   * A pointer to parent directory used for path traversal. ParentID will be
+   * used only when the key is created into a FileSystemOptimized(FSO) bucket.
+   * <p>
+   * For example, if a key "a/b/key1" created into a FSOBucket then each
+   * path component will be assigned an ObjectId and linked to its parent path
+   * component using parent's objectID.
+   * <p>
+   * Say, Bucket's ObjectID = 512, which is the parent for its immediate child
+   * element.
+   * <p>
+   * ------------------------------------------|
+   * PathComponent |   ObjectID   |   ParentID |
+   * ------------------------------------------|
+   *      a        |     1024     |     512    |
+   * ------------------------------------------|
+   *      b        |     1025     |     1024   |
+   * ------------------------------------------|
+   *     key1      |     1026     |     1025   |
+   * ------------------------------------------|
+   */
+  private long parentObjectID;
+
+  /**
+   * Represents leaf node name. This also will be used when the keyName is
+   * created on a FileSystemOptimized(FSO) bucket. For example, the user given
+   * keyName is "a/b/key1" then the fileName stores "key1".
+   */
+  private String fileName;
 
   /**
    * ACL Information.
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmMultipartKeyInfo.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmMultipartKeyInfo.java
index df8751c..be37f93 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmMultipartKeyInfo.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmMultipartKeyInfo.java
@@ -37,6 +37,30 @@ public class OmMultipartKeyInfo extends WithObjectID {
   private TreeMap<Integer, PartKeyInfo> partKeyInfoList;
 
   /**
+   * A pointer to parent directory used for path traversal. ParentID will be
+   * used only when the multipart key is created into a FileSystemOptimized(FSO)
+   * bucket.
+   * <p>
+   * For example, if a key "a/b/multiKey1" created into a FSOBucket then each
+   * path component will be assigned an ObjectId and linked to its parent path
+   * component using parent's objectID.
+   * <p>
+   * Say, Bucket's ObjectID = 512, which is the parent for its immediate child
+   * element.
+   * <p>
+   * ------------------------------------------|
+   * PathComponent |   ObjectID   |   ParentID |
+   * ------------------------------------------|
+   *      a        |     1024     |     512    |
+   * ------------------------------------------|
+   *      b        |     1025     |     1024   |
+   * ------------------------------------------|
+   *   multiKey1   |     1026     |     1025   |
+   * ------------------------------------------|
+   */
+  private long parentID;
+
+  /**
    * Construct OmMultipartKeyInfo object which holds multipart upload
    * information for a key.
    */
@@ -53,6 +77,29 @@ public class OmMultipartKeyInfo extends WithObjectID {
   }
 
   /**
+   * Construct OmMultipartKeyInfo object which holds multipart upload
+   * information for a key.
+   */
+  @SuppressWarnings("parameternumber")
+  public OmMultipartKeyInfo(String id, long creationTime,
+      ReplicationType replicationType, ReplicationFactor replicationFactor,
+      Map<Integer, PartKeyInfo> list, long objectID, long updateID,
+      long parentObjId) {
+    this(id, creationTime, replicationType, replicationFactor, list, objectID,
+            updateID);
+    this.parentID = parentObjId;
+  }
+
+  /**
+   * Returns parentID.
+   *
+   * @return long
+   */
+  public long getParentID() {
+    return parentID;
+  }
+
+  /**
    * Returns the uploadID for this multi part upload of a key.
    * @return uploadID
    */
@@ -95,6 +142,7 @@ public class OmMultipartKeyInfo extends WithObjectID {
     private TreeMap<Integer, PartKeyInfo> partKeyInfoList;
     private long objectID;
     private long updateID;
+    private long parentID;
 
     public Builder() {
       this.partKeyInfoList = new TreeMap<>();
@@ -144,9 +192,14 @@ public class OmMultipartKeyInfo extends WithObjectID {
       return this;
     }
 
+    public Builder setParentID(long parentObjId) {
+      this.parentID = parentObjId;
+      return this;
+    }
+
     public OmMultipartKeyInfo build() {
       return new OmMultipartKeyInfo(uploadID, creationTime, replicationType,
-          replicationFactor, partKeyInfoList, objectID, updateID);
+          replicationFactor, partKeyInfoList, objectID, updateID, parentID);
     }
   }
 
@@ -163,7 +216,7 @@ public class OmMultipartKeyInfo extends WithObjectID {
     return new OmMultipartKeyInfo(multipartKeyInfo.getUploadID(),
         multipartKeyInfo.getCreationTime(), multipartKeyInfo.getType(),
         multipartKeyInfo.getFactor(), list, multipartKeyInfo.getObjectID(),
-        multipartKeyInfo.getUpdateID());
+        multipartKeyInfo.getUpdateID(), multipartKeyInfo.getParentID());
   }
 
   /**
@@ -177,7 +230,8 @@ public class OmMultipartKeyInfo extends WithObjectID {
         .setType(replicationType)
         .setFactor(replicationFactor)
         .setObjectID(objectID)
-        .setUpdateID(updateID);
+        .setUpdateID(updateID)
+        .setParentID(parentID);
     partKeyInfoList.forEach((key, value) -> builder.addPartKeyInfoList(value));
     return builder.build();
   }
@@ -205,7 +259,8 @@ public class OmMultipartKeyInfo extends WithObjectID {
     // For partKeyInfoList we can do shallow copy here, as the PartKeyInfo is
     // immutable here.
     return new OmMultipartKeyInfo(uploadID, creationTime, replicationType,
-        replicationFactor, new TreeMap<>(partKeyInfoList), objectID, updateID);
+        replicationFactor, new TreeMap<>(partKeyInfoList), objectID, updateID,
+        parentID);
   }
 
 }
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneClientMultipartUploadV1.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneClientMultipartUploadV1.java
new file mode 100644
index 0000000..93e5826
--- /dev/null
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneClientMultipartUploadV1.java
@@ -0,0 +1,182 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.ozone.client.rpc;
+
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.ozone.MiniOzoneCluster;
+import org.apache.hadoop.ozone.client.ObjectStore;
+import org.apache.hadoop.ozone.client.OzoneBucket;
+import org.apache.hadoop.ozone.client.OzoneClient;
+import org.apache.hadoop.ozone.client.OzoneClientFactory;
+import org.apache.hadoop.ozone.client.OzoneVolume;
+import org.apache.hadoop.ozone.om.OMConfigKeys;
+import org.apache.hadoop.ozone.om.helpers.OmMultipartInfo;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNotNull;
+
+import org.apache.hadoop.ozone.om.request.TestOMRequestUtils;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.Timeout;
+
+import java.io.IOException;
+import java.util.UUID;
+
+import static org.apache.hadoop.hdds.client.ReplicationFactor.ONE;
+import static org.apache.hadoop.hdds.client.ReplicationType.STAND_ALONE;
+
+/**
+ * This test verifies all the S3 multipart client apis - layout version V1.
+ */
+public class TestOzoneClientMultipartUploadV1 {
+
+  private static ObjectStore store = null;
+  private static MiniOzoneCluster cluster = null;
+  private static OzoneClient ozClient = null;
+
+  private static String scmId = UUID.randomUUID().toString();
+
+  /**
+   * Set a timeout for each test.
+   */
+  @Rule
+  public Timeout timeout = new Timeout(300000);
+
+  /**
+   * Create a MiniOzoneCluster for testing.
+   * <p>
+   * Ozone is made active by setting OZONE_ENABLED = true
+   *
+   * @throws IOException
+   */
+  @BeforeClass
+  public static void init() throws Exception {
+    OzoneConfiguration conf = new OzoneConfiguration();
+    TestOMRequestUtils.configureFSOptimizedPaths(conf,
+            true, OMConfigKeys.OZONE_OM_LAYOUT_VERSION_V1);
+    startCluster(conf);
+  }
+
+  /**
+   * Close OzoneClient and shutdown MiniOzoneCluster.
+   */
+  @AfterClass
+  public static void shutdown() throws IOException {
+    shutdownCluster();
+  }
+
+
+  /**
+   * Create a MiniOzoneCluster for testing.
+   * @param conf Configurations to start the cluster.
+   * @throws Exception
+   */
+  static void startCluster(OzoneConfiguration conf) throws Exception {
+    cluster = MiniOzoneCluster.newBuilder(conf)
+            .setNumDatanodes(3)
+            .setTotalPipelineNumLimit(10)
+            .setScmId(scmId)
+            .build();
+    cluster.waitForClusterToBeReady();
+    ozClient = OzoneClientFactory.getRpcClient(conf);
+    store = ozClient.getObjectStore();
+  }
+
+  /**
+   * Close OzoneClient and shutdown MiniOzoneCluster.
+   */
+  static void shutdownCluster() throws IOException {
+    if(ozClient != null) {
+      ozClient.close();
+    }
+
+    if (cluster != null) {
+      cluster.shutdown();
+    }
+  }
+
+  @Test
+  public void testInitiateMultipartUploadWithReplicationInformationSet() throws
+          IOException {
+    String volumeName = UUID.randomUUID().toString();
+    String bucketName = UUID.randomUUID().toString();
+    String keyName = UUID.randomUUID().toString();
+
+    store.createVolume(volumeName);
+    OzoneVolume volume = store.getVolume(volumeName);
+    volume.createBucket(bucketName);
+    OzoneBucket bucket = volume.getBucket(bucketName);
+    OmMultipartInfo multipartInfo = bucket.initiateMultipartUpload(keyName,
+            STAND_ALONE, ONE);
+
+    assertNotNull(multipartInfo);
+    String uploadID = multipartInfo.getUploadID();
+    Assert.assertEquals(volumeName, multipartInfo.getVolumeName());
+    Assert.assertEquals(bucketName, multipartInfo.getBucketName());
+    Assert.assertEquals(keyName, multipartInfo.getKeyName());
+    assertNotNull(multipartInfo.getUploadID());
+
+    // Call initiate multipart upload for the same key again, this should
+    // generate a new uploadID.
+    multipartInfo = bucket.initiateMultipartUpload(keyName,
+            STAND_ALONE, ONE);
+
+    assertNotNull(multipartInfo);
+    Assert.assertEquals(volumeName, multipartInfo.getVolumeName());
+    Assert.assertEquals(bucketName, multipartInfo.getBucketName());
+    Assert.assertEquals(keyName, multipartInfo.getKeyName());
+    assertNotEquals(multipartInfo.getUploadID(), uploadID);
+    assertNotNull(multipartInfo.getUploadID());
+  }
+
+  @Test
+  public void testInitiateMultipartUploadWithDefaultReplication() throws
+          IOException {
+    String volumeName = UUID.randomUUID().toString();
+    String bucketName = UUID.randomUUID().toString();
+    String keyName = UUID.randomUUID().toString();
+
+    store.createVolume(volumeName);
+    OzoneVolume volume = store.getVolume(volumeName);
+    volume.createBucket(bucketName);
+    OzoneBucket bucket = volume.getBucket(bucketName);
+    OmMultipartInfo multipartInfo = bucket.initiateMultipartUpload(keyName);
+
+    assertNotNull(multipartInfo);
+    String uploadID = multipartInfo.getUploadID();
+    Assert.assertEquals(volumeName, multipartInfo.getVolumeName());
+    Assert.assertEquals(bucketName, multipartInfo.getBucketName());
+    Assert.assertEquals(keyName, multipartInfo.getKeyName());
+    assertNotNull(multipartInfo.getUploadID());
+
+    // Call initiate multipart upload for the same key again, this should
+    // generate a new uploadID.
+    multipartInfo = bucket.initiateMultipartUpload(keyName);
+
+    assertNotNull(multipartInfo);
+    Assert.assertEquals(volumeName, multipartInfo.getVolumeName());
+    Assert.assertEquals(bucketName, multipartInfo.getBucketName());
+    Assert.assertEquals(keyName, multipartInfo.getKeyName());
+    assertNotEquals(multipartInfo.getUploadID(), uploadID);
+    assertNotNull(multipartInfo.getUploadID());
+  }
+
+}
diff --git a/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto b/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto
index d84014c..7f6ddf0 100644
--- a/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto
+++ b/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto
@@ -1096,6 +1096,7 @@ message MultipartKeyInfo {
     repeated PartKeyInfo partKeyInfoList = 5;
     optional uint64 objectID = 6;
     optional uint64 updateID = 7;
+    optional uint64 parentID = 8;
 }
 
 message PartKeyInfo {
diff --git a/hadoop-ozone/interface-storage/src/main/java/org/apache/hadoop/ozone/om/OMMetadataManager.java b/hadoop-ozone/interface-storage/src/main/java/org/apache/hadoop/ozone/om/OMMetadataManager.java
index caa8ed7..b354643 100644
--- a/hadoop-ozone/interface-storage/src/main/java/org/apache/hadoop/ozone/om/OMMetadataManager.java
+++ b/hadoop-ozone/interface-storage/src/main/java/org/apache/hadoop/ozone/om/OMMetadataManager.java
@@ -425,4 +425,14 @@ public interface OMMetadataManager {
    * @return DB directory key as String.
    */
   String getOpenFileName(long parentObjectId, String fileName, long id);
+
+  /**
+   * Returns the DB key name of a multipart upload key in OM metadata store.
+   *
+   * @param parentObjectId - parent object Id
+   * @param fileName       - file name
+   * @param uploadId       - the upload id for this key
+   * @return bytes of DB key.
+   */
+  String getMultipartKey(long parentObjectId, String fileName, String uploadId);
 }
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java
index cbf3e5c..1a90757 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java
@@ -131,9 +131,11 @@ public class OmMetadataManagerImpl implements OMMetadataManager {
    * |----------------------------------------------------------------------|
    * |  directoryTable    | parentId/directoryName -> DirectoryInfo         |
    * |----------------------------------------------------------------------|
-   * |  fileTable         | parentId/fileName -> KeyInfo                |
+   * |  fileTable         | parentId/fileName -> KeyInfo                    |
    * |----------------------------------------------------------------------|
-   * |  openFileTable     | parentId/fileName/id -> KeyInfo                   |
+   * |  openFileTable     | parentId/fileName/id -> KeyInfo                 |
+   * |----------------------------------------------------------------------|
+   * |  multipartFileInfoTable | parentId/fileName/uploadId ->...           |
    * |----------------------------------------------------------------------|
    * |  transactionInfoTable | #TRANSACTIONINFO -> OMTransactionInfo        |
    * |----------------------------------------------------------------------|
@@ -152,6 +154,7 @@ public class OmMetadataManagerImpl implements OMMetadataManager {
   public static final String DIRECTORY_TABLE = "directoryTable";
   public static final String FILE_TABLE = "fileTable";
   public static final String OPEN_FILE_TABLE = "openFileTable";
+  public static final String MULTIPARTFILEINFO_TABLE = "multipartFileInfoTable";
   public static final String TRANSACTION_INFO_TABLE =
       "transactionInfoTable";
 
@@ -176,6 +179,7 @@ public class OmMetadataManagerImpl implements OMMetadataManager {
   private Table transactionInfoTable;
   private boolean isRatisEnabled;
   private boolean ignorePipelineinKey;
+  private Table<String, OmMultipartKeyInfo> multipartFileInfoTable;
 
   // Epoch is used to generate the objectIDs. The most significant 2 bits of
   // objectIDs is set to this epoch. For clusters before HDDS-4315 there is
@@ -270,6 +274,9 @@ public class OmMetadataManagerImpl implements OMMetadataManager {
 
   @Override
   public Table<String, OmMultipartKeyInfo> getMultipartInfoTable() {
+    if (OzoneManagerRatisUtils.isBucketFSOptimized()) {
+      return multipartFileInfoTable;
+    }
     return multipartInfoTable;
   }
 
@@ -363,6 +370,7 @@ public class OmMetadataManagerImpl implements OMMetadataManager {
         .addTable(DIRECTORY_TABLE)
         .addTable(FILE_TABLE)
         .addTable(OPEN_FILE_TABLE)
+        .addTable(MULTIPARTFILEINFO_TABLE)
         .addTable(TRANSACTION_INFO_TABLE)
         .addCodec(OzoneTokenIdentifier.class, new TokenIdentifierCodec())
         .addCodec(OmKeyInfo.class, new OmKeyInfoCodec(true))
@@ -441,6 +449,10 @@ public class OmMetadataManagerImpl implements OMMetadataManager {
             OmKeyInfo.class);
     checkTableStatus(openFileTable, OPEN_FILE_TABLE);
 
+    multipartFileInfoTable = this.store.getTable(MULTIPARTFILEINFO_TABLE,
+            String.class, OmMultipartKeyInfo.class);
+    checkTableStatus(multipartFileInfoTable, MULTIPARTFILEINFO_TABLE);
+
     transactionInfoTable = this.store.getTable(TRANSACTION_INFO_TABLE,
         String.class, OMTransactionInfo.class);
     checkTableStatus(transactionInfoTable, TRANSACTION_INFO_TABLE);
@@ -1221,4 +1233,14 @@ public class OmMetadataManagerImpl implements OMMetadataManager {
     openKey.append(OM_KEY_PREFIX).append(id);
     return openKey.toString();
   }
+
+  @Override
+  public String getMultipartKey(long parentID, String fileName,
+                                String uploadId) {
+    StringBuilder openKey = new StringBuilder();
+    openKey.append(parentID);
+    openKey.append(OM_KEY_PREFIX).append(fileName);
+    openKey.append(OM_KEY_PREFIX).append(uploadId);
+    return openKey.toString();
+  }
 }
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/codec/OMDBDefinition.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/codec/OMDBDefinition.java
index d025948..5362bac 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/codec/OMDBDefinition.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/codec/OMDBDefinition.java
@@ -170,6 +170,15 @@ public class OMDBDefinition implements DBDefinition {
                   OmKeyInfo.class,
                   new OmKeyInfoCodec(true));
 
+  public static final DBColumnFamilyDefinition<String, OmMultipartKeyInfo>
+          MULTIPART_FILEINFO_TABLE =
+          new DBColumnFamilyDefinition<>(
+                  OmMetadataManagerImpl.MULTIPARTFILEINFO_TABLE,
+                  String.class,
+                  new StringCodec(),
+                  OmMultipartKeyInfo.class,
+                  new OmMultipartKeyInfoCodec());
+
   @Override
   public String getName() {
     return OzoneConsts.OM_DB_NAME;
@@ -186,7 +195,7 @@ public class OMDBDefinition implements DBDefinition {
         VOLUME_TABLE, OPEN_KEY_TABLE, KEY_TABLE,
         BUCKET_TABLE, MULTIPART_INFO_TABLE, PREFIX_TABLE, DTOKEN_TABLE,
         S3_SECRET_TABLE, TRANSACTION_INFO_TABLE, DIRECTORY_TABLE,
-        FILE_TABLE, OPEN_FILE_TABLE};
+        FILE_TABLE, OPEN_FILE_TABLE, MULTIPART_FILEINFO_TABLE};
   }
 }
 
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/utils/OzoneManagerRatisUtils.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/utils/OzoneManagerRatisUtils.java
index 26fded2..8daa12b 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/utils/OzoneManagerRatisUtils.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/utils/OzoneManagerRatisUtils.java
@@ -57,6 +57,7 @@ import org.apache.hadoop.ozone.om.request.key.acl.prefix.OMPrefixAddAclRequest;
 import org.apache.hadoop.ozone.om.request.key.acl.prefix.OMPrefixRemoveAclRequest;
 import org.apache.hadoop.ozone.om.request.key.acl.prefix.OMPrefixSetAclRequest;
 import org.apache.hadoop.ozone.om.request.s3.multipart.S3InitiateMultipartUploadRequest;
+import org.apache.hadoop.ozone.om.request.s3.multipart.S3InitiateMultipartUploadRequestV1;
 import org.apache.hadoop.ozone.om.request.s3.multipart.S3MultipartUploadAbortRequest;
 import org.apache.hadoop.ozone.om.request.s3.multipart.S3MultipartUploadCommitPartRequest;
 import org.apache.hadoop.ozone.om.request.s3.multipart.S3MultipartUploadCompleteRequest;
@@ -182,6 +183,9 @@ public final class OzoneManagerRatisUtils {
     case PurgeKeys:
       return new OMKeyPurgeRequest(omRequest);
     case InitiateMultiPartUpload:
+      if (isBucketFSOptimized()) {
+        return new S3InitiateMultipartUploadRequestV1(omRequest);
+      }
       return new S3InitiateMultipartUploadRequest(omRequest);
     case CommitMultiPartUpload:
       return new S3MultipartUploadCommitPartRequest(omRequest);
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/file/OMFileRequest.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/file/OMFileRequest.java
index aadc126..7f2d2c5 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/file/OMFileRequest.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/file/OMFileRequest.java
@@ -456,6 +456,7 @@ public final class OMFileRequest {
       // For example, the user given key path is '/a/b/c/d/e/file1', then in DB
       // keyName field stores only the leaf node name, which is 'file1'.
       omFileInfo.setKeyName(fileName);
+      omFileInfo.setFileName(fileName);
       keyInfoOptional = Optional.of(omFileInfo);
     }
 
@@ -481,6 +482,7 @@ public final class OMFileRequest {
     // For example, the user given key path is '/a/b/c/d/e/file1', then in DB
     // keyName field stores only the leaf node name, which is 'file1'.
     omFileInfo.setKeyName(fileName);
+    omFileInfo.setFileName(fileName);
 
     omMetadataManager.getKeyTable().addCacheEntry(
             new CacheKey<>(dbFileKey),
@@ -511,6 +513,30 @@ public final class OMFileRequest {
   }
 
   /**
+   * Adding multipart omKeyInfo to open file table.
+   *
+   * @param omMetadataMgr OM Metadata Manager
+   * @param batchOp       batch of db operations
+   * @param omFileInfo    omKeyInfo
+   * @param uploadID      uploadID
+   * @return multipartFileKey
+   * @throws IOException DB failure
+   */
+  public static String addToOpenFileTable(OMMetadataManager omMetadataMgr,
+      BatchOperation batchOp, OmKeyInfo omFileInfo, String uploadID)
+          throws IOException {
+
+    String multipartFileKey = omMetadataMgr.getMultipartKey(
+            omFileInfo.getParentObjectID(), omFileInfo.getFileName(),
+            uploadID);
+
+    omMetadataMgr.getOpenKeyTable().putWithBatch(batchOp, multipartFileKey,
+            omFileInfo);
+
+    return multipartFileKey;
+  }
+
+  /**
    * Adding omKeyInfo to file table.
    *
    * @param omMetadataMgr
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3InitiateMultipartUploadRequestV1.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3InitiateMultipartUploadRequestV1.java
new file mode 100644
index 0000000..3507090
--- /dev/null
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3InitiateMultipartUploadRequestV1.java
@@ -0,0 +1,271 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.om.request.s3.multipart;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.hdds.utils.db.cache.CacheKey;
+import org.apache.hadoop.hdds.utils.db.cache.CacheValue;
+import org.apache.hadoop.ozone.audit.OMAction;
+import org.apache.hadoop.ozone.om.OMMetadataManager;
+import org.apache.hadoop.ozone.om.OzoneManager;
+import org.apache.hadoop.ozone.om.exceptions.OMException;
+import org.apache.hadoop.ozone.om.helpers.OmDirectoryInfo;
+import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
+import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfoGroup;
+import org.apache.hadoop.ozone.om.helpers.OmMultipartKeyInfo;
+import org.apache.hadoop.ozone.om.helpers.OzoneAclUtil;
+import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerDoubleBufferHelper;
+import org.apache.hadoop.ozone.om.request.file.OMDirectoryCreateRequestV1;
+import org.apache.hadoop.ozone.om.request.file.OMFileRequest;
+import org.apache.hadoop.ozone.om.request.util.OmResponseUtil;
+import org.apache.hadoop.ozone.om.response.OMClientResponse;
+import org.apache.hadoop.ozone.om.response.s3.multipart.S3InitiateMultipartUploadResponse;
+import org.apache.hadoop.ozone.om.response.s3.multipart.S3InitiateMultipartUploadResponseV1;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.KeyArgs;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.MultipartInfoInitiateRequest;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.MultipartInfoInitiateResponse;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.NOT_SUPPORTED_OPERATION;
+import static org.apache.hadoop.ozone.om.lock.OzoneManagerLock.Resource.BUCKET_LOCK;
+import static org.apache.hadoop.ozone.om.request.file.OMFileRequest.OMDirectoryResult.DIRECTORY_EXISTS;
+
+/**
+ * Handles initiate multipart upload request.
+ */
+public class S3InitiateMultipartUploadRequestV1
+        extends S3InitiateMultipartUploadRequest {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(S3InitiateMultipartUploadRequestV1.class);
+
+  public S3InitiateMultipartUploadRequestV1(OMRequest omRequest) {
+    super(omRequest);
+  }
+
+  @Override
+  @SuppressWarnings("methodlength")
+  public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager,
+      long transactionLogIndex,
+      OzoneManagerDoubleBufferHelper ozoneManagerDoubleBufferHelper) {
+    MultipartInfoInitiateRequest multipartInfoInitiateRequest =
+        getOmRequest().getInitiateMultiPartUploadRequest();
+
+    KeyArgs keyArgs =
+        multipartInfoInitiateRequest.getKeyArgs();
+
+    Preconditions.checkNotNull(keyArgs.getMultipartUploadID());
+
+    Map<String, String> auditMap = buildKeyArgsAuditMap(keyArgs);
+
+    String volumeName = keyArgs.getVolumeName();
+    String bucketName = keyArgs.getBucketName();
+    final String requestedVolume = volumeName;
+    final String requestedBucket = bucketName;
+    String keyName = keyArgs.getKeyName();
+
+    OMMetadataManager omMetadataManager = ozoneManager.getMetadataManager();
+
+    ozoneManager.getMetrics().incNumInitiateMultipartUploads();
+    boolean acquiredBucketLock = false;
+    IOException exception = null;
+    OmMultipartKeyInfo multipartKeyInfo = null;
+    OmKeyInfo omKeyInfo = null;
+    List<OmDirectoryInfo> missingParentInfos;
+    Result result = null;
+
+    OMResponse.Builder omResponse = OmResponseUtil.getOMResponseBuilder(
+        getOmRequest());
+    OMClientResponse omClientResponse = null;
+    try {
+      keyArgs = resolveBucketLink(ozoneManager, keyArgs, auditMap);
+      volumeName = keyArgs.getVolumeName();
+      bucketName = keyArgs.getBucketName();
+
+      // TODO to support S3 ACL later.
+      acquiredBucketLock =
+          omMetadataManager.getLock().acquireWriteLock(BUCKET_LOCK,
+              volumeName, bucketName);
+
+      validateBucketAndVolume(omMetadataManager, volumeName, bucketName);
+
+      // If KMS is configured and TDE is enabled on bucket, throw MPU not
+      // supported.
+      if (ozoneManager.getKmsProvider() != null) {
+        if (omMetadataManager.getBucketTable().get(
+            omMetadataManager.getBucketKey(volumeName, bucketName))
+            .getEncryptionKeyInfo() != null) {
+          throw new OMException("MultipartUpload is not yet supported on " +
+              "encrypted buckets", NOT_SUPPORTED_OPERATION);
+        }
+      }
+
+      OMFileRequest.OMPathInfoV1 pathInfoV1 =
+              OMFileRequest.verifyDirectoryKeysInPath(omMetadataManager,
+                      volumeName, bucketName, keyName, Paths.get(keyName));
+
+      // check if the directory already existed in OM
+      checkDirectoryResult(keyName, pathInfoV1.getDirectoryResult());
+
+      // add all missing parents to dir table
+      missingParentInfos =
+              OMDirectoryCreateRequestV1.getAllMissingParentDirInfo(
+                      ozoneManager, keyArgs, pathInfoV1, transactionLogIndex);
+
+      // We are adding uploadId to key, because if multiple users try to
+      // perform multipart upload on the same key, each will try to upload, who
+      // ever finally commit the key, we see that key in ozone. Suppose if we
+      // don't add id, and use the same key /volume/bucket/key, when multiple
+      // users try to upload the key, we update the parts of the key's from
+      // multiple users to same key, and the key output can be a mix of the
+      // parts from multiple users.
+
+      // So on same key if multiple time multipart upload is initiated we
+      // store multiple entries in the openKey Table.
+      // Checked AWS S3, when we try to run multipart upload, each time a
+      // new uploadId is returned. And also even if a key exist when initiate
+      // multipart upload request is received, it returns multipart upload id
+      // for the key.
+
+      String multipartKey = omMetadataManager.getMultipartKey(
+              pathInfoV1.getLastKnownParentId(), pathInfoV1.getLeafNodeName(),
+              keyArgs.getMultipartUploadID());
+
+      // Even if this key already exists in the KeyTable, it would be taken
+      // care of in the final complete multipart upload. AWS S3 behavior is
+      // also like this, even when key exists in a bucket, user can still
+      // initiate MPU.
+
+      multipartKeyInfo = new OmMultipartKeyInfo.Builder()
+          .setUploadID(keyArgs.getMultipartUploadID())
+          .setCreationTime(keyArgs.getModificationTime())
+          .setReplicationType(keyArgs.getType())
+          .setReplicationFactor(keyArgs.getFactor())
+          .setObjectID(pathInfoV1.getLeafNodeObjectId())
+          .setUpdateID(transactionLogIndex)
+          .setParentID(pathInfoV1.getLastKnownParentId())
+          .build();
+
+      omKeyInfo = new OmKeyInfo.Builder()
+          .setVolumeName(volumeName)
+          .setBucketName(bucketName)
+          .setKeyName(keyArgs.getKeyName())
+          .setCreationTime(keyArgs.getModificationTime())
+          .setModificationTime(keyArgs.getModificationTime())
+          .setReplicationType(keyArgs.getType())
+          .setReplicationFactor(keyArgs.getFactor())
+          .setOmKeyLocationInfos(Collections.singletonList(
+              new OmKeyLocationInfoGroup(0, new ArrayList<>())))
+          .setAcls(OzoneAclUtil.fromProtobuf(keyArgs.getAclsList()))
+          .setObjectID(pathInfoV1.getLeafNodeObjectId())
+          .setUpdateID(transactionLogIndex)
+          .setParentObjectID(pathInfoV1.getLastKnownParentId())
+          .build();
+
+      // Add cache entries for the prefix directories.
+      // Skip adding for the file key itself, until Key Commit.
+      OMFileRequest.addDirectoryTableCacheEntries(omMetadataManager,
+              Optional.absent(), Optional.of(missingParentInfos),
+              transactionLogIndex);
+
+      OMFileRequest.addOpenFileTableCacheEntry(omMetadataManager,
+              multipartKey, omKeyInfo, pathInfoV1.getLeafNodeName(),
+              transactionLogIndex);
+
+      // Add to cache
+      omMetadataManager.getMultipartInfoTable().addCacheEntry(
+          new CacheKey<>(multipartKey),
+          new CacheValue<>(Optional.of(multipartKeyInfo), transactionLogIndex));
+
+      omClientResponse =
+          new S3InitiateMultipartUploadResponseV1(
+              omResponse.setInitiateMultiPartUploadResponse(
+                  MultipartInfoInitiateResponse.newBuilder()
+                      .setVolumeName(requestedVolume)
+                      .setBucketName(requestedBucket)
+                      .setKeyName(keyName)
+                      .setMultipartUploadID(keyArgs.getMultipartUploadID()))
+                  .build(), multipartKeyInfo, omKeyInfo, missingParentInfos);
+
+      result = Result.SUCCESS;
+    } catch (IOException ex) {
+      result = Result.FAILURE;
+      exception = ex;
+      omClientResponse = new S3InitiateMultipartUploadResponse(
+          createErrorOMResponse(omResponse, exception));
+    } finally {
+      addResponseToDoubleBuffer(transactionLogIndex, omClientResponse,
+          ozoneManagerDoubleBufferHelper);
+      if (acquiredBucketLock) {
+        omMetadataManager.getLock().releaseWriteLock(BUCKET_LOCK,
+            volumeName, bucketName);
+      }
+    }
+
+    // audit log
+    auditLog(ozoneManager.getAuditLogger(), buildAuditMessage(
+        OMAction.INITIATE_MULTIPART_UPLOAD, auditMap,
+        exception, getOmRequest().getUserInfo()));
+
+    switch (result) {
+    case SUCCESS:
+      LOG.debug("S3 InitiateMultipart Upload request for Key {} in " +
+              "Volume/Bucket {}/{} is successfully completed", keyName,
+          volumeName, bucketName);
+      break;
+    case FAILURE:
+      ozoneManager.getMetrics().incNumInitiateMultipartUploadFails();
+      LOG.error("S3 InitiateMultipart Upload request for Key {} in " +
+              "Volume/Bucket {}/{} is failed", keyName, volumeName, bucketName,
+          exception);
+      break;
+    default:
+      LOG.error("Unrecognized Result for S3InitiateMultipartUploadRequest: {}",
+          multipartInfoInitiateRequest);
+    }
+
+    return omClientResponse;
+  }
+
+  /**
+   * Verify om directory result.
+   *
+   * @param keyName           key name
+   * @param omDirectoryResult directory result
+   * @throws OMException if file or directory or file exists in the given path
+   */
+  private void checkDirectoryResult(String keyName,
+      OMFileRequest.OMDirectoryResult omDirectoryResult) throws OMException {
+    if (omDirectoryResult == DIRECTORY_EXISTS) {
+      throw new OMException("Can not write to directory: " + keyName,
+              OMException.ResultCodes.NOT_A_FILE);
+    }
+  }
+}
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3MultipartUploadCompleteRequest.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3MultipartUploadCompleteRequest.java
index dff022b..6d64b54 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3MultipartUploadCompleteRequest.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3MultipartUploadCompleteRequest.java
@@ -144,9 +144,10 @@ public class S3MultipartUploadCompleteRequest extends OMKeyRequest {
       // Check for directory exists with same name, if it exists throw error. 
       if (ozoneManager.getEnableFileSystemPaths()) {
         if (checkDirectoryAlreadyExists(volumeName, bucketName, keyName,
-            omMetadataManager)) {
+                omMetadataManager)) {
           throw new OMException("Can not Complete MPU for file: " + keyName +
-              " as there is already directory in the given path", NOT_A_FILE);
+                  " as there is already directory in the given path",
+                  NOT_A_FILE);
         }
       }
 
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/s3/multipart/S3InitiateMultipartUploadResponseV1.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/s3/multipart/S3InitiateMultipartUploadResponseV1.java
new file mode 100644
index 0000000..ff3e63f
--- /dev/null
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/s3/multipart/S3InitiateMultipartUploadResponseV1.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.hadoop.ozone.om.response.s3.multipart;
+
+import org.apache.hadoop.hdds.utils.db.BatchOperation;
+import org.apache.hadoop.ozone.om.OMMetadataManager;
+import org.apache.hadoop.ozone.om.helpers.OmDirectoryInfo;
+import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
+import org.apache.hadoop.ozone.om.helpers.OmMultipartKeyInfo;
+import org.apache.hadoop.ozone.om.request.file.OMFileRequest;
+import org.apache.hadoop.ozone.om.response.CleanupTableInfo;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse;
+
+import javax.annotation.Nonnull;
+import java.io.IOException;
+import java.util.List;
+
+import static org.apache.hadoop.ozone.om.OmMetadataManagerImpl.DIRECTORY_TABLE;
+import static org.apache.hadoop.ozone.om.OmMetadataManagerImpl.MULTIPARTFILEINFO_TABLE;
+import static org.apache.hadoop.ozone.om.OmMetadataManagerImpl.OPEN_FILE_TABLE;
+
+/**
+ * Response for S3 Initiate Multipart Upload request for layout V1.
+ */
+@CleanupTableInfo(cleanupTables = {DIRECTORY_TABLE, OPEN_FILE_TABLE,
+        MULTIPARTFILEINFO_TABLE})
+public class S3InitiateMultipartUploadResponseV1 extends
+        S3InitiateMultipartUploadResponse {
+  private List<OmDirectoryInfo> parentDirInfos;
+
+  public S3InitiateMultipartUploadResponseV1(
+      @Nonnull OMResponse omResponse,
+      @Nonnull OmMultipartKeyInfo omMultipartKeyInfo,
+      @Nonnull OmKeyInfo omKeyInfo,
+      @Nonnull List<OmDirectoryInfo> parentDirInfos) {
+    super(omResponse, omMultipartKeyInfo, omKeyInfo);
+    this.parentDirInfos = parentDirInfos;
+  }
+
+  @Override
+  public void addToDBBatch(OMMetadataManager omMetadataManager,
+      BatchOperation batchOperation) throws IOException {
+
+    /**
+     * Create parent directory entries during MultiPartFileKey Create - do not
+     * wait for File Commit request.
+     */
+    if (parentDirInfos != null) {
+      for (OmDirectoryInfo parentDirInfo : parentDirInfos) {
+        String parentKey = parentDirInfo.getPath();
+        omMetadataManager.getDirectoryTable().putWithBatch(batchOperation,
+                parentKey, parentDirInfo);
+      }
+    }
+
+    String multipartFileKey =
+            OMFileRequest.addToOpenFileTable(omMetadataManager, batchOperation,
+                    getOmKeyInfo(), getOmMultipartKeyInfo().getUploadID());
+
+    omMetadataManager.getMultipartInfoTable().putWithBatch(batchOperation,
+            multipartFileKey, getOmMultipartKeyInfo());
+  }
+}
diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/s3/multipart/TestS3InitiateMultipartUploadRequestV1.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/s3/multipart/TestS3InitiateMultipartUploadRequestV1.java
new file mode 100644
index 0000000..dac2efe
--- /dev/null
+++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/s3/multipart/TestS3InitiateMultipartUploadRequestV1.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.hadoop.ozone.om.request.s3.multipart;
+
+import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
+import org.apache.hadoop.ozone.om.helpers.OmDirectoryInfo;
+import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
+import org.apache.hadoop.ozone.om.helpers.OmMultipartKeyInfo;
+import org.apache.hadoop.ozone.om.request.TestOMRequestUtils;
+import org.apache.hadoop.ozone.om.response.OMClientResponse;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+
+/**
+ * Tests S3 Initiate Multipart Upload request.
+ */
+public class TestS3InitiateMultipartUploadRequestV1
+    extends TestS3InitiateMultipartUploadRequest {
+
+  @Test
+  public void testValidateAndUpdateCache() throws Exception {
+    String volumeName = UUID.randomUUID().toString();
+    String bucketName = UUID.randomUUID().toString();
+    String prefix = "a/b/c/";
+    List<String> dirs = new ArrayList<String>();
+    dirs.add("a");
+    dirs.add("b");
+    dirs.add("c");
+    String fileName = UUID.randomUUID().toString();
+    String keyName = prefix + fileName;
+
+    // Add volume and bucket to DB.
+    TestOMRequestUtils.addVolumeAndBucketToDB(volumeName, bucketName,
+        omMetadataManager);
+
+    String bucketKey = omMetadataManager.getBucketKey(volumeName, bucketName);
+    OmBucketInfo omBucketInfo =
+            omMetadataManager.getBucketTable().get(bucketKey);
+    long bucketID = omBucketInfo.getObjectID();
+
+    OMRequest modifiedRequest = doPreExecuteInitiateMPUV1(volumeName,
+        bucketName, keyName);
+
+    S3InitiateMultipartUploadRequestV1 s3InitiateMultipartUploadRequestV1 =
+        new S3InitiateMultipartUploadRequestV1(modifiedRequest);
+
+    OMClientResponse omClientResponse =
+            s3InitiateMultipartUploadRequestV1.validateAndUpdateCache(
+                    ozoneManager, 100L,
+                    ozoneManagerDoubleBufferHelper);
+
+    Assert.assertEquals(OzoneManagerProtocolProtos.Status.OK,
+            omClientResponse.getOMResponse().getStatus());
+
+    long parentID = verifyDirectoriesInDB(dirs, bucketID);
+
+    String multipartFileKey = omMetadataManager.getMultipartKey(parentID,
+            fileName, modifiedRequest.getInitiateMultiPartUploadRequest()
+                    .getKeyArgs().getMultipartUploadID());
+
+    OmKeyInfo omKeyInfo = omMetadataManager.getOpenKeyTable()
+            .get(multipartFileKey);
+    Assert.assertNotNull("Failed to find the fileInfo", omKeyInfo);
+    Assert.assertEquals("FileName mismatches!", fileName,
+            omKeyInfo.getKeyName());
+    Assert.assertEquals("ParentId mismatches!", parentID,
+            omKeyInfo.getParentObjectID());
+
+    OmMultipartKeyInfo omMultipartKeyInfo = omMetadataManager
+            .getMultipartInfoTable().get(multipartFileKey);
+    Assert.assertNotNull("Failed to find the multipartFileInfo",
+            omMultipartKeyInfo);
+    Assert.assertEquals("ParentId mismatches!", parentID,
+            omMultipartKeyInfo.getParentID());
+
+    Assert.assertEquals(modifiedRequest.getInitiateMultiPartUploadRequest()
+            .getKeyArgs().getMultipartUploadID(),
+        omMultipartKeyInfo
+            .getUploadID());
+
+    Assert.assertEquals(modifiedRequest.getInitiateMultiPartUploadRequest()
+        .getKeyArgs().getModificationTime(),
+        omKeyInfo
+        .getModificationTime());
+    Assert.assertEquals(modifiedRequest.getInitiateMultiPartUploadRequest()
+            .getKeyArgs().getModificationTime(),
+        omKeyInfo
+            .getCreationTime());
+  }
+
+  @Test
+  public void testValidateAndUpdateCacheWithBucketNotFound() throws Exception {
+    String volumeName = UUID.randomUUID().toString();
+    String bucketName = UUID.randomUUID().toString();
+    String keyName = UUID.randomUUID().toString();
+
+    TestOMRequestUtils.addVolumeToDB(volumeName, omMetadataManager);
+
+    OMRequest modifiedRequest = doPreExecuteInitiateMPU(
+        volumeName, bucketName, keyName);
+
+    S3InitiateMultipartUploadRequest s3InitiateMultipartUploadRequest =
+        new S3InitiateMultipartUploadRequest(modifiedRequest);
+
+    OMClientResponse omClientResponse =
+        s3InitiateMultipartUploadRequest.validateAndUpdateCache(ozoneManager,
+            100L, ozoneManagerDoubleBufferHelper);
+
+    Assert.assertEquals(OzoneManagerProtocolProtos.Status.BUCKET_NOT_FOUND,
+        omClientResponse.getOMResponse().getStatus());
+
+    String multipartKey = omMetadataManager.getMultipartKey(volumeName,
+        bucketName, keyName, modifiedRequest.getInitiateMultiPartUploadRequest()
+            .getKeyArgs().getMultipartUploadID());
+
+    Assert.assertTrue(omMetadataManager.getOpenKeyTable().isEmpty());
+    Assert.assertTrue(omMetadataManager.getMultipartInfoTable().isEmpty());
+  }
+
+  @Test
+  public void testValidateAndUpdateCacheWithVolumeNotFound() throws Exception {
+    String volumeName = UUID.randomUUID().toString();
+    String bucketName = UUID.randomUUID().toString();
+    String keyName = UUID.randomUUID().toString();
+
+    OMRequest modifiedRequest = doPreExecuteInitiateMPU(volumeName, bucketName,
+        keyName);
+
+    S3InitiateMultipartUploadRequest s3InitiateMultipartUploadRequest =
+        new S3InitiateMultipartUploadRequest(modifiedRequest);
+
+    OMClientResponse omClientResponse =
+        s3InitiateMultipartUploadRequest.validateAndUpdateCache(ozoneManager,
+            100L, ozoneManagerDoubleBufferHelper);
+
+    Assert.assertEquals(OzoneManagerProtocolProtos.Status.VOLUME_NOT_FOUND,
+        omClientResponse.getOMResponse().getStatus());
+
+    Assert.assertTrue(omMetadataManager.getOpenKeyTable().isEmpty());
+    Assert.assertTrue(omMetadataManager.getMultipartInfoTable().isEmpty());
+  }
+
+  private long verifyDirectoriesInDB(List<String> dirs, long bucketID)
+      throws IOException {
+    // bucketID is the parent
+    long parentID = bucketID;
+    for (int indx = 0; indx < dirs.size(); indx++) {
+      String dirName = dirs.get(indx);
+      String dbKey = "";
+      // for index=0, parentID is bucketID
+      dbKey = omMetadataManager.getOzonePathKey(parentID, dirName);
+      OmDirectoryInfo omDirInfo =
+              omMetadataManager.getDirectoryTable().get(dbKey);
+      Assert.assertNotNull("Invalid directory!", omDirInfo);
+      Assert.assertEquals("Invalid directory!", dirName, omDirInfo.getName());
+      Assert.assertEquals("Invalid dir path!",
+              parentID + "/" + dirName, omDirInfo.getPath());
+      parentID = omDirInfo.getObjectID();
+    }
+    return parentID;
+  }
+}
diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/s3/multipart/TestS3MultipartRequest.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/s3/multipart/TestS3MultipartRequest.java
index f2c5b66..641ee8d 100644
--- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/s3/multipart/TestS3MultipartRequest.java
+++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/s3/multipart/TestS3MultipartRequest.java
@@ -218,4 +218,33 @@ public class TestS3MultipartRequest {
   }
 
 
+  /**
+   * Perform preExecute of Initiate Multipart upload request for given
+   * volume, bucket and key name.
+   * @param volumeName
+   * @param bucketName
+   * @param keyName
+   * @return OMRequest - returned from preExecute.
+   */
+  protected OMRequest doPreExecuteInitiateMPUV1(
+      String volumeName, String bucketName, String keyName) throws Exception {
+    OMRequest omRequest =
+            TestOMRequestUtils.createInitiateMPURequest(volumeName, bucketName,
+                    keyName);
+
+    S3InitiateMultipartUploadRequestV1 s3InitiateMultipartUploadRequestV1 =
+            new S3InitiateMultipartUploadRequestV1(omRequest);
+
+    OMRequest modifiedRequest =
+            s3InitiateMultipartUploadRequestV1.preExecute(ozoneManager);
+
+    Assert.assertNotEquals(omRequest, modifiedRequest);
+    Assert.assertTrue(modifiedRequest.hasInitiateMultiPartUploadRequest());
+    Assert.assertNotNull(modifiedRequest.getInitiateMultiPartUploadRequest()
+            .getKeyArgs().getMultipartUploadID());
+    Assert.assertTrue(modifiedRequest.getInitiateMultiPartUploadRequest()
+            .getKeyArgs().getModificationTime() > 0);
+
+    return modifiedRequest;
+  }
 }
diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/s3/multipart/TestS3InitiateMultipartUploadResponse.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/s3/multipart/TestS3InitiateMultipartUploadResponse.java
index 4996bd0..03065ab 100644
--- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/s3/multipart/TestS3InitiateMultipartUploadResponse.java
+++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/s3/multipart/TestS3InitiateMultipartUploadResponse.java
@@ -31,7 +31,7 @@ public class TestS3InitiateMultipartUploadResponse
     extends TestS3MultipartResponse {
 
   @Test
-  public void addDBToBatch() throws Exception {
+  public void testAddDBToBatch() throws Exception {
     String volumeName = UUID.randomUUID().toString();
     String bucketName = UUID.randomUUID().toString();
     String keyName = UUID.randomUUID().toString();
diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/s3/multipart/TestS3InitiateMultipartUploadResponseV1.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/s3/multipart/TestS3InitiateMultipartUploadResponseV1.java
new file mode 100644
index 0000000..31f9e5a
--- /dev/null
+++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/s3/multipart/TestS3InitiateMultipartUploadResponseV1.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.hadoop.ozone.om.response.s3.multipart;
+
+import org.apache.hadoop.ozone.om.helpers.OmDirectoryInfo;
+import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
+import org.apache.hadoop.ozone.om.helpers.OmMultipartKeyInfo;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+
+/**
+ * Class tests S3 Initiate MPU response.
+ */
+public class TestS3InitiateMultipartUploadResponseV1
+    extends TestS3InitiateMultipartUploadResponse {
+
+  @Test
+  public void testAddDBToBatch() throws Exception {
+    String volumeName = UUID.randomUUID().toString();
+    String bucketName = UUID.randomUUID().toString();
+    String prefix = "a/b/c/d/";
+    List<String> dirs = new ArrayList<String>();
+    dirs.add("a");
+    dirs.add("b");
+    dirs.add("c");
+    dirs.add("d");
+    String fileName = UUID.randomUUID().toString();
+    String keyName = prefix + fileName;
+
+    String multipartUploadID = UUID.randomUUID().toString();
+
+    long parentID = 1027; // assume objectID of dir path "a/b/c/d" is 1027
+    List<OmDirectoryInfo> parentDirInfos = new ArrayList<>();
+
+    S3InitiateMultipartUploadResponse s3InitiateMultipartUploadResponseV1 =
+            createS3InitiateMPUResponseV1(volumeName, bucketName, parentID,
+                    keyName, multipartUploadID, parentDirInfos);
+
+    s3InitiateMultipartUploadResponseV1.addToDBBatch(omMetadataManager,
+        batchOperation);
+
+    // Do manual commit and see whether addToBatch is successful or not.
+    omMetadataManager.getStore().commitBatchOperation(batchOperation);
+
+    String multipartKey = omMetadataManager.getMultipartKey(parentID, fileName,
+            multipartUploadID);
+
+    OmKeyInfo omKeyInfo = omMetadataManager.getOpenKeyTable().get(multipartKey);
+    Assert.assertNotNull("Failed to find the fileInfo", omKeyInfo);
+    Assert.assertEquals("FileName mismatches!", fileName,
+            omKeyInfo.getKeyName());
+    Assert.assertEquals("ParentId mismatches!", parentID,
+            omKeyInfo.getParentObjectID());
+
+    OmMultipartKeyInfo omMultipartKeyInfo = omMetadataManager
+            .getMultipartInfoTable().get(multipartKey);
+    Assert.assertNotNull("Failed to find the multipartFileInfo",
+            omMultipartKeyInfo);
+    Assert.assertEquals("ParentId mismatches!", parentID,
+            omMultipartKeyInfo.getParentID());
+
+    Assert.assertEquals("Upload Id mismatches!", multipartUploadID,
+            omMultipartKeyInfo.getUploadID());
+  }
+}
diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/s3/multipart/TestS3MultipartResponse.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/s3/multipart/TestS3MultipartResponse.java
index 4f50d9e..76ceb0e 100644
--- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/s3/multipart/TestS3MultipartResponse.java
+++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/s3/multipart/TestS3MultipartResponse.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.ozone.om.response.s3.multipart;
 
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.List;
 import java.util.UUID;
 
 import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
@@ -30,9 +31,11 @@ import org.junit.Rule;
 import org.junit.rules.TemporaryFolder;
 
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.ozone.om.helpers.OmDirectoryInfo;
 import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
 import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfoGroup;
 import org.apache.hadoop.ozone.om.helpers.OmMultipartKeyInfo;
+import org.apache.hadoop.ozone.om.helpers.OzoneFSUtils;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
     .KeyInfo;
@@ -152,4 +155,47 @@ public class TestS3MultipartResponse {
             .setType(HddsProtos.ReplicationType.RATIS)
             .setFactor(HddsProtos.ReplicationFactor.ONE).build()).build();
   }
+
+
+  public S3InitiateMultipartUploadResponse createS3InitiateMPUResponseV1(
+      String volumeName, String bucketName, long parentID, String keyName,
+      String multipartUploadID, List<OmDirectoryInfo> parentDirInfos) {
+    OmMultipartKeyInfo multipartKeyInfo = new OmMultipartKeyInfo.Builder()
+            .setUploadID(multipartUploadID)
+            .setCreationTime(Time.now())
+            .setReplicationType(HddsProtos.ReplicationType.RATIS)
+            .setReplicationFactor(HddsProtos.ReplicationFactor.ONE)
+            .setParentID(parentID)
+            .build();
+
+    String fileName = OzoneFSUtils.getFileName(keyName);
+
+    OmKeyInfo omKeyInfo = new OmKeyInfo.Builder()
+            .setVolumeName(volumeName)
+            .setBucketName(bucketName)
+            .setKeyName(fileName)
+            .setFileName(fileName)
+            .setCreationTime(Time.now())
+            .setModificationTime(Time.now())
+            .setReplicationType(HddsProtos.ReplicationType.RATIS)
+            .setReplicationFactor(HddsProtos.ReplicationFactor.ONE)
+            .setOmKeyLocationInfos(Collections.singletonList(
+                    new OmKeyLocationInfoGroup(0, new ArrayList<>())))
+            .setParentObjectID(parentID)
+            .build();
+
+    OMResponse omResponse = OMResponse.newBuilder()
+            .setCmdType(OzoneManagerProtocolProtos.Type.InitiateMultiPartUpload)
+            .setStatus(OzoneManagerProtocolProtos.Status.OK)
+            .setSuccess(true).setInitiateMultiPartUploadResponse(
+                    OzoneManagerProtocolProtos.MultipartInfoInitiateResponse
+                            .newBuilder().setVolumeName(volumeName)
+                            .setBucketName(bucketName)
+                            .setKeyName(keyName)
+                            .setMultipartUploadID(multipartUploadID)).build();
+
+    return new S3InitiateMultipartUploadResponseV1(omResponse, multipartKeyInfo,
+            omKeyInfo, parentDirInfos);
+  }
+
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org