You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by ra...@apache.org on 2021/02/25 06:20:38 UTC

[ozone] 18/19: HDDS-4813. [FSO]S3Multipart: Implement UploadCompleteRequest (#1923)

This is an automated email from the ASF dual-hosted git repository.

rakeshr pushed a commit to branch HDDS-2939
in repository https://gitbox.apache.org/repos/asf/ozone.git

commit fd393dacf9b453f4429b2cab9f15d5b9dcda4555
Author: Rakesh Radhakrishnan <ra...@apache.org>
AuthorDate: Wed Feb 17 20:50:05 2021 +0530

    HDDS-4813. [FSO]S3Multipart: Implement UploadCompleteRequest (#1923)
---
 .../rpc/TestOzoneClientMultipartUploadV1.java      | 274 ++++++++++++++--
 .../om/ratis/utils/OzoneManagerRatisUtils.java     |   4 +
 .../ozone/om/request/file/OMFileRequest.java       |   4 +-
 .../ozone/om/request/key/OMKeyCommitRequestV1.java |   2 +-
 .../S3MultipartUploadCommitPartRequestV1.java      |  39 ++-
 .../S3MultipartUploadCompleteRequest.java          | 348 +++++++++++++--------
 .../S3MultipartUploadCompleteRequestV1.java        | 268 ++++++++++++++++
 .../S3MultipartUploadCompleteResponse.java         |  13 +-
 ...va => S3MultipartUploadCompleteResponseV1.java} |  60 ++--
 .../s3/multipart/TestS3MultipartRequest.java       |   9 +-
 .../TestS3MultipartUploadCompleteRequest.java      | 118 ++++++-
 .../TestS3MultipartUploadCompleteRequestV1.java    | 132 ++++++++
 .../s3/multipart/TestS3MultipartResponse.java      |  21 ++
 .../TestS3MultipartUploadCompleteResponseV1.java   | 257 +++++++++++++++
 14 files changed, 1330 insertions(+), 219 deletions(-)

diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneClientMultipartUploadV1.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneClientMultipartUploadV1.java
index af241c5..1ab2cc3 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneClientMultipartUploadV1.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneClientMultipartUploadV1.java
@@ -17,24 +17,29 @@
 
 package org.apache.hadoop.ozone.client.rpc;
 
+import org.apache.commons.lang3.RandomUtils;
+import org.apache.hadoop.hdds.client.ReplicationFactor;
 import org.apache.hadoop.hdds.client.ReplicationType;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.ozone.MiniOzoneCluster;
+import org.apache.hadoop.ozone.OzoneTestUtils;
 import org.apache.hadoop.ozone.client.ObjectStore;
 import org.apache.hadoop.ozone.client.OzoneBucket;
 import org.apache.hadoop.ozone.client.OzoneClient;
 import org.apache.hadoop.ozone.client.OzoneClientFactory;
 import org.apache.hadoop.ozone.client.OzoneVolume;
+import org.apache.hadoop.ozone.client.io.OzoneInputStream;
 import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
 import org.apache.hadoop.ozone.om.OMConfigKeys;
+import org.apache.hadoop.ozone.om.exceptions.OMException;
 import org.apache.hadoop.ozone.om.helpers.OmMultipartCommitUploadPartInfo;
 import org.apache.hadoop.ozone.om.helpers.OmMultipartInfo;
 
+import static java.nio.charset.StandardCharsets.UTF_8;
 import static org.apache.hadoop.hdds.StringUtils.string2Bytes;
 import static org.apache.hadoop.hdds.client.ReplicationFactor.THREE;
-import static org.junit.Assert.assertNotEquals;
-import static org.junit.Assert.assertNotNull;
 
+import org.apache.hadoop.ozone.om.helpers.OmMultipartUploadCompleteInfo;
 import org.apache.hadoop.ozone.om.request.TestOMRequestUtils;
 import org.junit.AfterClass;
 import org.junit.Assert;
@@ -44,10 +49,15 @@ import org.junit.Test;
 import org.junit.rules.Timeout;
 
 import java.io.IOException;
+import java.util.Arrays;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.TreeMap;
 import java.util.UUID;
 
 import static org.apache.hadoop.hdds.client.ReplicationFactor.ONE;
 import static org.apache.hadoop.hdds.client.ReplicationType.STAND_ALONE;
+import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.NO_SUCH_MULTIPART_UPLOAD_ERROR;
 
 /**
  * This test verifies all the S3 multipart client apis - layout version V1.
@@ -133,24 +143,24 @@ public class TestOzoneClientMultipartUploadV1 {
     OmMultipartInfo multipartInfo = bucket.initiateMultipartUpload(keyName,
             STAND_ALONE, ONE);
 
-    assertNotNull(multipartInfo);
+    Assert.assertNotNull(multipartInfo);
     String uploadID = multipartInfo.getUploadID();
     Assert.assertEquals(volumeName, multipartInfo.getVolumeName());
     Assert.assertEquals(bucketName, multipartInfo.getBucketName());
     Assert.assertEquals(keyName, multipartInfo.getKeyName());
-    assertNotNull(multipartInfo.getUploadID());
+    Assert.assertNotNull(multipartInfo.getUploadID());
 
     // Call initiate multipart upload for the same key again, this should
     // generate a new uploadID.
     multipartInfo = bucket.initiateMultipartUpload(keyName,
             STAND_ALONE, ONE);
 
-    assertNotNull(multipartInfo);
+    Assert.assertNotNull(multipartInfo);
     Assert.assertEquals(volumeName, multipartInfo.getVolumeName());
     Assert.assertEquals(bucketName, multipartInfo.getBucketName());
     Assert.assertEquals(keyName, multipartInfo.getKeyName());
-    assertNotEquals(multipartInfo.getUploadID(), uploadID);
-    assertNotNull(multipartInfo.getUploadID());
+    Assert.assertNotEquals(multipartInfo.getUploadID(), uploadID);
+    Assert.assertNotNull(multipartInfo.getUploadID());
   }
 
   @Test
@@ -166,23 +176,23 @@ public class TestOzoneClientMultipartUploadV1 {
     OzoneBucket bucket = volume.getBucket(bucketName);
     OmMultipartInfo multipartInfo = bucket.initiateMultipartUpload(keyName);
 
-    assertNotNull(multipartInfo);
+    Assert.assertNotNull(multipartInfo);
     String uploadID = multipartInfo.getUploadID();
     Assert.assertEquals(volumeName, multipartInfo.getVolumeName());
     Assert.assertEquals(bucketName, multipartInfo.getBucketName());
     Assert.assertEquals(keyName, multipartInfo.getKeyName());
-    assertNotNull(multipartInfo.getUploadID());
+    Assert.assertNotNull(multipartInfo.getUploadID());
 
     // Call initiate multipart upload for the same key again, this should
     // generate a new uploadID.
     multipartInfo = bucket.initiateMultipartUpload(keyName);
 
-    assertNotNull(multipartInfo);
+    Assert.assertNotNull(multipartInfo);
     Assert.assertEquals(volumeName, multipartInfo.getVolumeName());
     Assert.assertEquals(bucketName, multipartInfo.getBucketName());
     Assert.assertEquals(keyName, multipartInfo.getKeyName());
-    assertNotEquals(multipartInfo.getUploadID(), uploadID);
-    assertNotNull(multipartInfo.getUploadID());
+    Assert.assertNotEquals(multipartInfo.getUploadID(), uploadID);
+    Assert.assertNotNull(multipartInfo.getUploadID());
   }
 
   @Test
@@ -199,12 +209,12 @@ public class TestOzoneClientMultipartUploadV1 {
     OmMultipartInfo multipartInfo = bucket.initiateMultipartUpload(keyName,
             STAND_ALONE, ONE);
 
-    assertNotNull(multipartInfo);
+    Assert.assertNotNull(multipartInfo);
     String uploadID = multipartInfo.getUploadID();
     Assert.assertEquals(volumeName, multipartInfo.getVolumeName());
     Assert.assertEquals(bucketName, multipartInfo.getBucketName());
     Assert.assertEquals(keyName, multipartInfo.getKeyName());
-    assertNotNull(multipartInfo.getUploadID());
+    Assert.assertNotNull(multipartInfo.getUploadID());
 
     OzoneOutputStream ozoneOutputStream = bucket.createMultipartKey(keyName,
             sampleData.length(), 1, uploadID);
@@ -214,8 +224,8 @@ public class TestOzoneClientMultipartUploadV1 {
     OmMultipartCommitUploadPartInfo commitUploadPartInfo = ozoneOutputStream
             .getCommitUploadPartInfo();
 
-    assertNotNull(commitUploadPartInfo);
-    assertNotNull(commitUploadPartInfo.getPartName());
+    Assert.assertNotNull(commitUploadPartInfo);
+    Assert.assertNotNull(commitUploadPartInfo.getPartName());
   }
 
   @Test
@@ -233,12 +243,12 @@ public class TestOzoneClientMultipartUploadV1 {
     OmMultipartInfo multipartInfo = bucket.initiateMultipartUpload(keyName,
             ReplicationType.RATIS, THREE);
 
-    assertNotNull(multipartInfo);
+    Assert.assertNotNull(multipartInfo);
     String uploadID = multipartInfo.getUploadID();
     Assert.assertEquals(volumeName, multipartInfo.getVolumeName());
     Assert.assertEquals(bucketName, multipartInfo.getBucketName());
     Assert.assertEquals(keyName, multipartInfo.getKeyName());
-    assertNotNull(multipartInfo.getUploadID());
+    Assert.assertNotNull(multipartInfo.getUploadID());
 
     int partNumber = 1;
 
@@ -250,9 +260,9 @@ public class TestOzoneClientMultipartUploadV1 {
     OmMultipartCommitUploadPartInfo commitUploadPartInfo = ozoneOutputStream
             .getCommitUploadPartInfo();
 
-    assertNotNull(commitUploadPartInfo);
+    Assert.assertNotNull(commitUploadPartInfo);
     String partName = commitUploadPartInfo.getPartName();
-    assertNotNull(commitUploadPartInfo.getPartName());
+    Assert.assertNotNull(commitUploadPartInfo.getPartName());
 
     //Overwrite the part by creating part key with same part number.
     sampleData = "sample Data Changed";
@@ -264,12 +274,230 @@ public class TestOzoneClientMultipartUploadV1 {
     commitUploadPartInfo = ozoneOutputStream
             .getCommitUploadPartInfo();
 
-    assertNotNull(commitUploadPartInfo);
-    assertNotNull(commitUploadPartInfo.getPartName());
+    Assert.assertNotNull(commitUploadPartInfo);
+    Assert.assertNotNull(commitUploadPartInfo.getPartName());
 
     // PartName should be different from old part Name.
-    assertNotEquals("Part names should be different", partName,
+    Assert.assertNotEquals("Part names should be different", partName,
             commitUploadPartInfo.getPartName());
   }
 
+  @Test
+  public void testMultipartUploadWithPartsLessThanMinSize() throws Exception {
+    String volumeName = UUID.randomUUID().toString();
+    String bucketName = UUID.randomUUID().toString();
+    String keyName = UUID.randomUUID().toString();
+
+    store.createVolume(volumeName);
+    OzoneVolume volume = store.getVolume(volumeName);
+    volume.createBucket(bucketName);
+    OzoneBucket bucket = volume.getBucket(bucketName);
+
+    // Initiate multipart upload
+    String uploadID = initiateMultipartUpload(bucket, keyName, STAND_ALONE,
+            ONE);
+
+    // Upload Parts
+    Map<Integer, String> partsMap = new TreeMap<>();
+    // Uploading part 1 with less than min size
+    String partName = uploadPart(bucket, keyName, uploadID, 1,
+            "data".getBytes(UTF_8));
+    partsMap.put(1, partName);
+
+    partName = uploadPart(bucket, keyName, uploadID, 2,
+            "data".getBytes(UTF_8));
+    partsMap.put(2, partName);
+
+    // Complete multipart upload
+    OzoneTestUtils.expectOmException(OMException.ResultCodes.ENTITY_TOO_SMALL,
+        () -> completeMultipartUpload(bucket, keyName, uploadID, partsMap));
+  }
+
+  @Test
+  public void testMultipartUploadWithPartsMisMatchWithListSizeDifferent()
+          throws Exception {
+    String volumeName = UUID.randomUUID().toString();
+    String bucketName = UUID.randomUUID().toString();
+    String keyName = UUID.randomUUID().toString();
+
+    store.createVolume(volumeName);
+    OzoneVolume volume = store.getVolume(volumeName);
+    volume.createBucket(bucketName);
+    OzoneBucket bucket = volume.getBucket(bucketName);
+
+    String uploadID = initiateMultipartUpload(bucket, keyName, STAND_ALONE,
+            ONE);
+
+    // We have not uploaded any parts, but passing some list it should throw
+    // error.
+    TreeMap<Integer, String> partsMap = new TreeMap<>();
+    partsMap.put(1, UUID.randomUUID().toString());
+
+    OzoneTestUtils.expectOmException(OMException.ResultCodes.INVALID_PART,
+        () -> completeMultipartUpload(bucket, keyName, uploadID, partsMap));
+  }
+
+  @Test
+  public void testMultipartUploadWithPartsMisMatchWithIncorrectPartName()
+          throws Exception {
+    String volumeName = UUID.randomUUID().toString();
+    String bucketName = UUID.randomUUID().toString();
+    String keyName = UUID.randomUUID().toString();
+
+    store.createVolume(volumeName);
+    OzoneVolume volume = store.getVolume(volumeName);
+    volume.createBucket(bucketName);
+    OzoneBucket bucket = volume.getBucket(bucketName);
+
+    String uploadID = initiateMultipartUpload(bucket, keyName, STAND_ALONE,
+            ONE);
+
+    uploadPart(bucket, keyName, uploadID, 1, "data".getBytes(UTF_8));
+
+    // passing with an incorrect part name, should throw INVALID_PART error.
+    TreeMap<Integer, String> partsMap = new TreeMap<>();
+    partsMap.put(1, UUID.randomUUID().toString());
+
+    OzoneTestUtils.expectOmException(OMException.ResultCodes.INVALID_PART,
+        () -> completeMultipartUpload(bucket, keyName, uploadID, partsMap));
+  }
+
+  @Test
+  public void testMultipartUploadWithMissingParts() throws Exception {
+    String volumeName = UUID.randomUUID().toString();
+    String bucketName = UUID.randomUUID().toString();
+    String keyName = UUID.randomUUID().toString();
+
+    store.createVolume(volumeName);
+    OzoneVolume volume = store.getVolume(volumeName);
+    volume.createBucket(bucketName);
+    OzoneBucket bucket = volume.getBucket(bucketName);
+
+    String uploadID = initiateMultipartUpload(bucket, keyName, STAND_ALONE,
+            ONE);
+
+    uploadPart(bucket, keyName, uploadID, 1, "data".getBytes(UTF_8));
+
+    // passing with an incorrect part number, should throw INVALID_PART error.
+    TreeMap<Integer, String> partsMap = new TreeMap<>();
+    partsMap.put(3, "random");
+
+    OzoneTestUtils.expectOmException(OMException.ResultCodes.INVALID_PART,
+        () -> completeMultipartUpload(bucket, keyName, uploadID, partsMap));
+  }
+
+  @Test
+  public void testCommitPartAfterCompleteUpload() throws Exception {
+    String volumeName = UUID.randomUUID().toString();
+    String bucketName = UUID.randomUUID().toString();
+    String parentDir = "a/b/c/d/";
+    String keyName = parentDir + UUID.randomUUID().toString();
+
+    store.createVolume(volumeName);
+    OzoneVolume volume = store.getVolume(volumeName);
+    volume.createBucket(bucketName);
+    OzoneBucket bucket = volume.getBucket(bucketName);
+
+    OmMultipartInfo omMultipartInfo = bucket.initiateMultipartUpload(keyName,
+            STAND_ALONE, ONE);
+
+    Assert.assertNotNull(omMultipartInfo.getUploadID());
+
+    String uploadID = omMultipartInfo.getUploadID();
+
+    // upload part 1.
+    byte[] data = generateData(5 * 1024 * 1024,
+            (byte) RandomUtils.nextLong());
+    OzoneOutputStream ozoneOutputStream = bucket.createMultipartKey(keyName,
+            data.length, 1, uploadID);
+    ozoneOutputStream.write(data, 0, data.length);
+    ozoneOutputStream.close();
+
+    OmMultipartCommitUploadPartInfo omMultipartCommitUploadPartInfo =
+            ozoneOutputStream.getCommitUploadPartInfo();
+
+    // Do not close output stream for part 2.
+    ozoneOutputStream = bucket.createMultipartKey(keyName,
+            data.length, 2, omMultipartInfo.getUploadID());
+    ozoneOutputStream.write(data, 0, data.length);
+
+    Map<Integer, String> partsMap = new LinkedHashMap<>();
+    partsMap.put(1, omMultipartCommitUploadPartInfo.getPartName());
+    OmMultipartUploadCompleteInfo omMultipartUploadCompleteInfo =
+            bucket.completeMultipartUpload(keyName,
+                    uploadID, partsMap);
+    Assert.assertNotNull(omMultipartUploadCompleteInfo);
+
+    Assert.assertNotNull(omMultipartCommitUploadPartInfo);
+
+    byte[] fileContent = new byte[data.length];
+    OzoneInputStream inputStream = bucket.readKey(keyName);
+    inputStream.read(fileContent);
+    StringBuilder sb = new StringBuilder(data.length);
+
+    // Combine all parts data, and check is it matching with get key data.
+    String part1 = new String(data, UTF_8);
+    sb.append(part1);
+    Assert.assertEquals(sb.toString(), new String(fileContent, UTF_8));
+
+    try {
+      ozoneOutputStream.close();
+      Assert.fail("testCommitPartAfterCompleteUpload failed");
+    } catch (IOException ex) {
+      Assert.assertTrue(ex instanceof OMException);
+      Assert.assertEquals(NO_SUCH_MULTIPART_UPLOAD_ERROR,
+              ((OMException) ex).getResult());
+    }
+  }
+
+  private String initiateMultipartUpload(OzoneBucket bucket, String keyName,
+      ReplicationType replicationType, ReplicationFactor replicationFactor)
+          throws Exception {
+    OmMultipartInfo multipartInfo = bucket.initiateMultipartUpload(keyName,
+            replicationType, replicationFactor);
+
+    String uploadID = multipartInfo.getUploadID();
+    Assert.assertNotNull(uploadID);
+
+    return uploadID;
+  }
+
+  private String uploadPart(OzoneBucket bucket, String keyName, String
+      uploadID, int partNumber, byte[] data) throws Exception {
+
+    OzoneOutputStream ozoneOutputStream = bucket.createMultipartKey(keyName,
+            data.length, partNumber, uploadID);
+    ozoneOutputStream.write(data, 0,
+            data.length);
+    ozoneOutputStream.close();
+
+    OmMultipartCommitUploadPartInfo omMultipartCommitUploadPartInfo =
+            ozoneOutputStream.getCommitUploadPartInfo();
+
+    Assert.assertNotNull(omMultipartCommitUploadPartInfo);
+    Assert.assertNotNull(omMultipartCommitUploadPartInfo.getPartName());
+
+    return omMultipartCommitUploadPartInfo.getPartName();
+  }
+
+  private void completeMultipartUpload(OzoneBucket bucket, String keyName,
+      String uploadID, Map<Integer, String> partsMap) throws Exception {
+    OmMultipartUploadCompleteInfo omMultipartUploadCompleteInfo = bucket
+            .completeMultipartUpload(keyName, uploadID, partsMap);
+
+    Assert.assertNotNull(omMultipartUploadCompleteInfo);
+    Assert.assertEquals(omMultipartUploadCompleteInfo.getBucket(), bucket
+            .getName());
+    Assert.assertEquals(omMultipartUploadCompleteInfo.getVolume(), bucket
+            .getVolumeName());
+    Assert.assertEquals(omMultipartUploadCompleteInfo.getKey(), keyName);
+    Assert.assertNotNull(omMultipartUploadCompleteInfo.getHash());
+  }
+
+  private byte[] generateData(int size, byte val) {
+    byte[] chars = new byte[size];
+    Arrays.fill(chars, val);
+    return chars;
+  }
+
 }
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/utils/OzoneManagerRatisUtils.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/utils/OzoneManagerRatisUtils.java
index 40a5396..2b39b8d 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/utils/OzoneManagerRatisUtils.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/utils/OzoneManagerRatisUtils.java
@@ -62,6 +62,7 @@ import org.apache.hadoop.ozone.om.request.s3.multipart.S3MultipartUploadAbortReq
 import org.apache.hadoop.ozone.om.request.s3.multipart.S3MultipartUploadCommitPartRequest;
 import org.apache.hadoop.ozone.om.request.s3.multipart.S3MultipartUploadCommitPartRequestV1;
 import org.apache.hadoop.ozone.om.request.s3.multipart.S3MultipartUploadCompleteRequest;
+import org.apache.hadoop.ozone.om.request.s3.multipart.S3MultipartUploadCompleteRequestV1;
 import org.apache.hadoop.ozone.om.request.s3.security.S3GetSecretRequest;
 import org.apache.hadoop.ozone.om.request.security.OMCancelDelegationTokenRequest;
 import org.apache.hadoop.ozone.om.request.security.OMGetDelegationTokenRequest;
@@ -196,6 +197,9 @@ public final class OzoneManagerRatisUtils {
     case AbortMultiPartUpload:
       return new S3MultipartUploadAbortRequest(omRequest);
     case CompleteMultiPartUpload:
+      if (isBucketFSOptimized()) {
+        return new S3MultipartUploadCompleteRequestV1(omRequest);
+      }
       return new S3MultipartUploadCompleteRequest(omRequest);
     case AddAcl:
     case RemoveAcl:
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/file/OMFileRequest.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/file/OMFileRequest.java
index 7f2d2c5..ebf86ce 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/file/OMFileRequest.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/file/OMFileRequest.java
@@ -542,9 +542,10 @@ public final class OMFileRequest {
    * @param omMetadataMgr
    * @param batchOp
    * @param omFileInfo
+   * @return db file key
    * @throws IOException
    */
-  public static void addToFileTable(OMMetadataManager omMetadataMgr,
+  public static String addToFileTable(OMMetadataManager omMetadataMgr,
                                     BatchOperation batchOp,
                                     OmKeyInfo omFileInfo)
           throws IOException {
@@ -554,6 +555,7 @@ public final class OMFileRequest {
 
     omMetadataMgr.getKeyTable().putWithBatch(batchOp,
             dbFileKey, omFileInfo);
+    return dbFileKey;
   }
 
   /**
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCommitRequestV1.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCommitRequestV1.java
index 3c49610..64991de 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCommitRequestV1.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCommitRequestV1.java
@@ -143,7 +143,7 @@ public class OMKeyCommitRequestV1 extends OMKeyCommitRequest {
       omKeyInfo.setModificationTime(commitKeyArgs.getModificationTime());
 
       // Update the block length for each block
-      omKeyInfo.updateLocationInfoList(locationInfoList);
+      omKeyInfo.updateLocationInfoList(locationInfoList, false);
 
       // Set the UpdateID to current transactionLogIndex
       omKeyInfo.setUpdateID(trxnLogIndex, ozoneManager.isRatisEnabled());
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3MultipartUploadCommitPartRequestV1.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3MultipartUploadCommitPartRequestV1.java
index 5546010..7aa21cf 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3MultipartUploadCommitPartRequestV1.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3MultipartUploadCommitPartRequestV1.java
@@ -86,7 +86,8 @@ public class S3MultipartUploadCommitPartRequestV1
     boolean acquiredLock = false;
 
     IOException exception = null;
-    String partName = null;
+    String dbPartName;
+    String fullKeyPartName = null;
     OMResponse.Builder omResponse = OmResponseUtil.getOMResponseBuilder(
         getOmRequest());
     OMClientResponse omClientResponse = null;
@@ -95,8 +96,8 @@ public class S3MultipartUploadCommitPartRequestV1
     OmKeyInfo omKeyInfo = null;
     String multipartKey = null;
     OmMultipartKeyInfo multipartKeyInfo = null;
-    Result result = null;
-    OmBucketInfo omBucketInfo = null;
+    Result result;
+    OmBucketInfo omBucketInfo;
     OmBucketInfo copyBucketInfo = null;
     try {
       keyArgs = resolveBucketLink(ozoneManager, keyArgs, auditMap);
@@ -141,14 +142,24 @@ public class S3MultipartUploadCommitPartRequestV1
       omKeyInfo.setDataSize(keyArgs.getDataSize());
       omKeyInfo.updateLocationInfoList(keyArgs.getKeyLocationsList().stream()
           .map(OmKeyLocationInfo::getFromProtobuf)
-          .collect(Collectors.toList()));
+          .collect(Collectors.toList()), true);
       // Set Modification time
       omKeyInfo.setModificationTime(keyArgs.getModificationTime());
       // Set the UpdateID to current transactionLogIndex
       omKeyInfo.setUpdateID(trxnLogIndex, ozoneManager.isRatisEnabled());
 
-      String ozoneKey = omMetadataManager.getOzonePathKey(parentID, fileName);
-      partName = ozoneKey + clientID;
+      /**
+       * Format of PartName stored into MultipartInfoTable is,
+       * "fileName + ClientID".
+       *
+       * Contract is that all part names present in a multipart info will
+       * have same key prefix path.
+       *
+       * For example:
+       *        /vol1/buck1/a/b/c/part-1, /vol1/buck1/a/b/c/part-2,
+       *        /vol1/buck1/a/b/c/part-n
+       */
+      dbPartName = fileName + clientID;
 
       if (multipartKeyInfo == null) {
         // This can occur when user started uploading part by the time commit
@@ -168,9 +179,9 @@ public class S3MultipartUploadCommitPartRequestV1
       // Build this multipart upload part info.
       OzoneManagerProtocolProtos.PartKeyInfo.Builder partKeyInfo =
           OzoneManagerProtocolProtos.PartKeyInfo.newBuilder();
-      partKeyInfo.setPartName(partName);
+      partKeyInfo.setPartName(dbPartName);
       partKeyInfo.setPartNumber(partNumber);
-      partKeyInfo.setPartKeyInfo(omKeyInfo.getProtobuf(
+      partKeyInfo.setPartKeyInfo(omKeyInfo.getProtobuf(fileName,
           getOmRequest().getVersion()));
 
       // Add this part information in to multipartKeyInfo.
@@ -207,9 +218,15 @@ public class S3MultipartUploadCommitPartRequestV1
           keyArgs.getKeyLocationsList().size() * scmBlockSize * factor;
       omBucketInfo.incrUsedBytes(correctedSpace);
 
+      // Prepare response. Sets user given full key part name in 'partName'
+      // attribute in response object.
+      String fullOzoneKeyName = omMetadataManager.getOzoneKey(
+              volumeName, bucketName, keyName);
+      fullKeyPartName = fullOzoneKeyName + clientID;
       omResponse.setCommitMultiPartUploadResponse(
           MultipartCommitUploadPartResponse.newBuilder()
-              .setPartName(partName));
+              .setPartName(fullKeyPartName));
+
       omClientResponse = new S3MultipartUploadCommitPartResponseV1(
           omResponse.build(), multipartKey, openKey,
           multipartKeyInfo, oldPartKeyInfo, omKeyInfo,
@@ -234,8 +251,8 @@ public class S3MultipartUploadCommitPartRequestV1
     }
 
     logResult(ozoneManager, multipartCommitUploadPartRequest, keyArgs,
-            auditMap, volumeName, bucketName, keyName, exception, partName,
-            result);
+            auditMap, volumeName, bucketName, keyName, exception,
+            fullKeyPartName, result);
 
     return omClientResponse;
   }
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3MultipartUploadCompleteRequest.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3MultipartUploadCompleteRequest.java
index 4590889..62c892d 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3MultipartUploadCompleteRequest.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3MultipartUploadCompleteRequest.java
@@ -18,14 +18,12 @@
 
 package org.apache.hadoop.ozone.om.request.s3.multipart;
 
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.TreeMap;
-
+import com.google.common.base.Optional;
+import org.apache.commons.codec.digest.DigestUtils;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.utils.db.cache.CacheKey;
+import org.apache.hadoop.hdds.utils.db.cache.CacheValue;
 import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.ozone.audit.OMAction;
 import org.apache.hadoop.ozone.om.OMMetadataManager;
@@ -36,7 +34,9 @@ import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
 import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfoGroup;
 import org.apache.hadoop.ozone.om.helpers.OmMultipartKeyInfo;
 import org.apache.hadoop.ozone.om.helpers.OzoneAclUtil;
+import org.apache.hadoop.ozone.om.helpers.OzoneFSUtils;
 import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerDoubleBufferHelper;
+import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerRatisUtils;
 import org.apache.hadoop.ozone.om.request.key.OMKeyRequest;
 import org.apache.hadoop.ozone.om.request.util.OmResponseUtil;
 import org.apache.hadoop.ozone.om.response.OMClientResponse;
@@ -49,15 +49,19 @@ import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMReque
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.PartKeyInfo;
 import org.apache.hadoop.util.Time;
-import org.apache.hadoop.hdds.utils.db.cache.CacheKey;
-import org.apache.hadoop.hdds.utils.db.cache.CacheValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
-import com.google.common.base.Optional;
-import org.apache.commons.codec.digest.DigestUtils;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+import static org.apache.hadoop.ozone.OzoneConsts.OM_MULTIPART_MIN_SIZE;
 import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.NOT_A_FILE;
 import static org.apache.hadoop.ozone.om.lock.OzoneManagerLock.Resource.BUCKET_LOCK;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /**
  * Handle Multipart upload complete request.
@@ -169,129 +173,25 @@ public class S3MultipartUploadCompleteRequest extends OMKeyRequest {
         }
 
         // First Check for Invalid Part Order.
-        int prevPartNumber = partsList.get(0).getPartNumber();
         List< Integer > partNumbers = new ArrayList<>();
-        int partsListSize = partsList.size();
-        partNumbers.add(prevPartNumber);
-        for (int i = 1; i < partsListSize; i++) {
-          int currentPartNumber = partsList.get(i).getPartNumber();
-          if (prevPartNumber >= currentPartNumber) {
-            LOG.error("PartNumber at index {} is {}, and its previous " +
-                    "partNumber at index {} is {} for ozonekey is " +
-                    "{}", i, currentPartNumber, i - 1, prevPartNumber,
-                ozoneKey);
-            throw new OMException(
-                failureMessage(requestedVolume, requestedBucket, keyName) +
-                " because parts are in Invalid order.",
-                OMException.ResultCodes.INVALID_PART_ORDER);
-          }
-          prevPartNumber = currentPartNumber;
-          partNumbers.add(prevPartNumber);
-        }
-
+        int partsListSize = getPartsListSize(requestedVolume,
+                requestedBucket, keyName, ozoneKey, partNumbers, partsList);
 
         List<OmKeyLocationInfo> partLocationInfos = new ArrayList<>();
-        long dataSize = 0;
-        int currentPartCount = 0;
-        // Now do actual logic, and check for any Invalid part during this.
-        for (OzoneManagerProtocolProtos.Part part : partsList) {
-          currentPartCount++;
-          int partNumber = part.getPartNumber();
-          String partName = part.getPartName();
-
-          PartKeyInfo partKeyInfo = partKeyInfoMap.get(partNumber);
-
-          if (partKeyInfo == null ||
-              !partName.equals(partKeyInfo.getPartName())) {
-            String omPartName = partKeyInfo == null ? null :
-                partKeyInfo.getPartName();
-            throw new OMException(
-                failureMessage(requestedVolume, requestedBucket, keyName) +
-                ". Provided Part info is { " + partName + ", " + partNumber +
-                "}, whereas OM has partName " + omPartName,
-                OMException.ResultCodes.INVALID_PART);
-          }
-
-          OmKeyInfo currentPartKeyInfo = OmKeyInfo
-              .getFromProtobuf(partKeyInfo.getPartKeyInfo());
-
-          // Except for last part all parts should have minimum size.
-          if (currentPartCount != partsListSize) {
-            if (currentPartKeyInfo.getDataSize() <
-                ozoneManager.getMinMultipartUploadPartSize()) {
-              LOG.error("MultipartUpload: {} Part number: {} size {}  is less" +
-                      " than minimum part size {}", ozoneKey,
-                  partKeyInfo.getPartNumber(), currentPartKeyInfo.getDataSize(),
-                  ozoneManager.getMinMultipartUploadPartSize());
-              throw new OMException(
-                  failureMessage(requestedVolume, requestedBucket, keyName) +
-                  ". Entity too small.",
-                  OMException.ResultCodes.ENTITY_TOO_SMALL);
-            }
-          }
-
-          // As all part keys will have only one version.
-          OmKeyLocationInfoGroup currentKeyInfoGroup = currentPartKeyInfo
-              .getKeyLocationVersions().get(0);
-
-          // Set partNumber in each block.
-          currentKeyInfoGroup.getLocationList().forEach(
-              omKeyLocationInfo -> omKeyLocationInfo.setPartNumber(partNumber));
-
-          partLocationInfos.addAll(currentKeyInfoGroup.getLocationList());
-          dataSize += currentPartKeyInfo.getDataSize();
-        }
+        long dataSize = getMultipartDataSize(requestedVolume, requestedBucket,
+                keyName, ozoneKey, partKeyInfoMap, partsListSize,
+                partLocationInfos, partsList, ozoneManager);
 
         // All parts have same replication information. Here getting from last
         // part.
-        HddsProtos.ReplicationType type = partKeyInfoMap.lastEntry().getValue()
-            .getPartKeyInfo().getType();
-        HddsProtos.ReplicationFactor factor =
-            partKeyInfoMap.lastEntry().getValue().getPartKeyInfo().getFactor();
-
-        OmKeyInfo omKeyInfo = omMetadataManager.getKeyTable().get(ozoneKey);
-        if (omKeyInfo == null) {
-          // This is a newly added key, it does not have any versions.
-          OmKeyLocationInfoGroup keyLocationInfoGroup = new
-              OmKeyLocationInfoGroup(0, partLocationInfos, true);
-
-          // Get the objectID of the key from OpenKeyTable
-          OmKeyInfo dbOpenKeyInfo = omMetadataManager.getOpenKeyTable()
-              .get(multipartKey);
-
-          // A newly created key, this is the first version.
-          OmKeyInfo.Builder builder =
-              new OmKeyInfo.Builder().setVolumeName(volumeName)
-              .setBucketName(bucketName).setKeyName(keyName)
-              .setReplicationFactor(factor).setReplicationType(type)
-              .setCreationTime(keyArgs.getModificationTime())
-              .setModificationTime(keyArgs.getModificationTime())
-              .setDataSize(dataSize)
-              .setFileEncryptionInfo(dbOpenKeyInfo.getFileEncryptionInfo())
-              .setOmKeyLocationInfos(
-                  Collections.singletonList(keyLocationInfoGroup))
-              .setAcls(OzoneAclUtil.fromProtobuf(keyArgs.getAclsList()));
-          // Check if db entry has ObjectID. This check is required because
-          // it is possible that between multipart key uploads and complete,
-          // we had an upgrade.
-          if (dbOpenKeyInfo.getObjectID() != 0) {
-            builder.setObjectID(dbOpenKeyInfo.getObjectID());
-          }
-          omKeyInfo = builder.build();
-        } else {
-          // Already a version exists, so we should add it as a new version.
-          // But now as versioning is not supported, just following the commit
-          // key approach. When versioning support comes, then we can uncomment
-          // below code keyInfo.addNewVersion(locations);
-          omKeyInfo.updateLocationInfoList(partLocationInfos, true);
-          omKeyInfo.setModificationTime(keyArgs.getModificationTime());
-          omKeyInfo.setDataSize(dataSize);
-        }
-        omKeyInfo.setUpdateID(trxnLogIndex, ozoneManager.isRatisEnabled());
+        OmKeyInfo omKeyInfo = getOmKeyInfo(ozoneManager, trxnLogIndex, keyArgs,
+                volumeName, bucketName, keyName, multipartKey,
+                omMetadataManager, ozoneKey, partKeyInfoMap, partLocationInfos,
+                dataSize);
 
         //Find all unused parts.
-        List< OmKeyInfo > unUsedParts = new ArrayList<>();
-        for (Map.Entry< Integer, PartKeyInfo > partKeyInfo :
+        List<OmKeyInfo> unUsedParts = new ArrayList<>();
+        for (Map.Entry< Integer, PartKeyInfo> partKeyInfo :
             partKeyInfoMap.entrySet()) {
           if (!partNumbers.contains(partKeyInfo.getKey())) {
             unUsedParts.add(OmKeyInfo
@@ -334,6 +234,19 @@ public class S3MultipartUploadCompleteRequest extends OMKeyRequest {
       }
     }
 
+    logResult(ozoneManager, multipartUploadCompleteRequest, partsList,
+            auditMap, volumeName, bucketName, keyName, exception, result);
+
+    return omClientResponse;
+  }
+
+  @SuppressWarnings("checkstyle:ParameterNumber")
+  protected void logResult(OzoneManager ozoneManager,
+      MultipartUploadCompleteRequest multipartUploadCompleteRequest,
+      List<OzoneManagerProtocolProtos.Part> partsList,
+      Map<String, String> auditMap, String volumeName,
+      String bucketName, String keyName, IOException exception,
+      Result result) {
     auditMap.put(OzoneConsts.MULTIPART_LIST, partsList.toString());
 
     // audit log
@@ -356,8 +269,183 @@ public class S3MultipartUploadCompleteRequest extends OMKeyRequest {
       LOG.error("Unrecognized Result for S3MultipartUploadCommitRequest: {}",
           multipartUploadCompleteRequest);
     }
+  }
 
-    return omClientResponse;
+  @SuppressWarnings("checkstyle:ParameterNumber")
+  protected OmKeyInfo getOmKeyInfo(OzoneManager ozoneManager, long trxnLogIndex,
+      KeyArgs keyArgs, String volumeName, String bucketName, String keyName,
+      String multipartKey, OMMetadataManager omMetadataManager,
+      String ozoneKey, TreeMap<Integer, PartKeyInfo> partKeyInfoMap,
+      List<OmKeyLocationInfo> partLocationInfos, long dataSize)
+          throws IOException {
+    HddsProtos.ReplicationType type = partKeyInfoMap.lastEntry().getValue()
+        .getPartKeyInfo().getType();
+    HddsProtos.ReplicationFactor factor =
+        partKeyInfoMap.lastEntry().getValue().getPartKeyInfo().getFactor();
+
+    OmKeyInfo omKeyInfo = getOmKeyInfoFromKeyTable(ozoneKey, keyName,
+            omMetadataManager);
+    if (omKeyInfo == null) {
+      // This is a newly added key, it does not have any versions.
+      OmKeyLocationInfoGroup keyLocationInfoGroup = new
+          OmKeyLocationInfoGroup(0, partLocationInfos, true);
+
+      // Get the objectID of the key from OpenKeyTable
+      OmKeyInfo dbOpenKeyInfo = getOmKeyInfoFromOpenKeyTable(multipartKey,
+              keyName, omMetadataManager);
+
+      // A newly created key, this is the first version.
+      OmKeyInfo.Builder builder =
+          new OmKeyInfo.Builder().setVolumeName(volumeName)
+          .setBucketName(bucketName).setKeyName(dbOpenKeyInfo.getKeyName())
+          .setReplicationFactor(factor).setReplicationType(type)
+          .setCreationTime(keyArgs.getModificationTime())
+          .setModificationTime(keyArgs.getModificationTime())
+          .setDataSize(dataSize)
+          .setFileEncryptionInfo(dbOpenKeyInfo.getFileEncryptionInfo())
+          .setOmKeyLocationInfos(
+              Collections.singletonList(keyLocationInfoGroup))
+          .setAcls(OzoneAclUtil.fromProtobuf(keyArgs.getAclsList()));
+      // Check if db entry has ObjectID. This check is required because
+      // it is possible that between multipart key uploads and complete,
+      // we had an upgrade.
+      if (dbOpenKeyInfo.getObjectID() != 0) {
+        builder.setObjectID(dbOpenKeyInfo.getObjectID());
+      }
+      updatePrefixFSOInfo(dbOpenKeyInfo, builder);
+      omKeyInfo = builder.build();
+    } else {
+      // Already a version exists, so we should add it as a new version.
+      // But now as versioning is not supported, just following the commit
+      // key approach. When versioning support comes, then we can uncomment
+      // below code keyInfo.addNewVersion(locations);
+      omKeyInfo.updateLocationInfoList(partLocationInfos, true);
+      omKeyInfo.setModificationTime(keyArgs.getModificationTime());
+      omKeyInfo.setDataSize(dataSize);
+    }
+    omKeyInfo.setUpdateID(trxnLogIndex, ozoneManager.isRatisEnabled());
+    return omKeyInfo;
+  }
+
+  protected void updatePrefixFSOInfo(OmKeyInfo dbOpenKeyInfo,
+      OmKeyInfo.Builder builder) {
+    // FSOBucket is disabled. Do nothing.
+  }
+
+  protected OmKeyInfo getOmKeyInfoFromKeyTable(String dbOzoneKey,
+      String keyName, OMMetadataManager omMetadataManager) throws IOException {
+    return omMetadataManager.getKeyTable().get(dbOzoneKey);
+  }
+
+  protected OmKeyInfo getOmKeyInfoFromOpenKeyTable(String dbMultipartKey,
+      String keyName, OMMetadataManager omMetadataManager) throws IOException {
+    return omMetadataManager.getOpenKeyTable().get(dbMultipartKey);
+  }
+
+  protected int getPartsListSize(String requestedVolume,
+      String requestedBucket, String keyName, String ozoneKey,
+      List<Integer> partNumbers,
+      List<OzoneManagerProtocolProtos.Part> partsList) throws OMException {
+    int prevPartNumber = partsList.get(0).getPartNumber();
+    int partsListSize = partsList.size();
+    partNumbers.add(prevPartNumber);
+    for (int i = 1; i < partsListSize; i++) {
+      int currentPartNumber = partsList.get(i).getPartNumber();
+      if (prevPartNumber >= currentPartNumber) {
+        LOG.error("PartNumber at index {} is {}, and its previous " +
+                "partNumber at index {} is {} for ozonekey is " +
+                "{}", i, currentPartNumber, i - 1, prevPartNumber,
+            ozoneKey);
+        throw new OMException(
+            failureMessage(requestedVolume, requestedBucket, keyName) +
+            " because parts are in Invalid order.",
+            OMException.ResultCodes.INVALID_PART_ORDER);
+      }
+      prevPartNumber = currentPartNumber;
+      partNumbers.add(prevPartNumber);
+    }
+    return partsListSize;
+  }
+
+  @SuppressWarnings("checkstyle:ParameterNumber")
+  protected long getMultipartDataSize(String requestedVolume,
+      String requestedBucket, String keyName, String ozoneKey,
+      TreeMap<Integer, PartKeyInfo> partKeyInfoMap,
+      int partsListSize, List<OmKeyLocationInfo> partLocationInfos,
+      List<OzoneManagerProtocolProtos.Part> partsList,
+      OzoneManager ozoneManager) throws OMException {
+    long dataSize = 0;
+    int currentPartCount = 0;
+    // Now do actual logic, and check for any Invalid part during this.
+    for (OzoneManagerProtocolProtos.Part part : partsList) {
+      currentPartCount++;
+      int partNumber = part.getPartNumber();
+      String partName = part.getPartName();
+
+      PartKeyInfo partKeyInfo = partKeyInfoMap.get(partNumber);
+
+      String dbPartName = null;
+      if (partKeyInfo != null) {
+        dbPartName = preparePartName(requestedVolume, requestedBucket, keyName,
+                partKeyInfo, ozoneManager.getMetadataManager());
+      }
+      if (!StringUtils.equals(partName, dbPartName)) {
+        String omPartName = partKeyInfo == null ? null : dbPartName;
+        throw new OMException(
+            failureMessage(requestedVolume, requestedBucket, keyName) +
+            ". Provided Part info is { " + partName + ", " + partNumber +
+            "}, whereas OM has partName " + omPartName,
+            OMException.ResultCodes.INVALID_PART);
+      }
+
+      OmKeyInfo currentPartKeyInfo = OmKeyInfo
+          .getFromProtobuf(partKeyInfo.getPartKeyInfo());
+
+      // Except for last part all parts should have minimum size.
+      if (currentPartCount != partsListSize) {
+        if (currentPartKeyInfo.getDataSize() <
+            ozoneManager.getMinMultipartUploadPartSize()) {
+          LOG.error("MultipartUpload: {} Part number: {} size {}  is less" +
+                  " than minimum part size {}", ozoneKey,
+              partKeyInfo.getPartNumber(), currentPartKeyInfo.getDataSize(),
+              ozoneManager.getMinMultipartUploadPartSize());
+          throw new OMException(
+              failureMessage(requestedVolume, requestedBucket, keyName) +
+                  ". Entity too small.",
+              OMException.ResultCodes.ENTITY_TOO_SMALL);
+        }
+      }
+
+      // As all part keys will have only one version.
+      OmKeyLocationInfoGroup currentKeyInfoGroup = currentPartKeyInfo
+          .getKeyLocationVersions().get(0);
+
+      // Set partNumber in each block.
+      currentKeyInfoGroup.getLocationList().forEach(
+          omKeyLocationInfo -> omKeyLocationInfo.setPartNumber(partNumber));
+
+      partLocationInfos.addAll(currentKeyInfoGroup.getLocationList());
+      dataSize += currentPartKeyInfo.getDataSize();
+    }
+    return dataSize;
+  }
+
+  private String preparePartName(String requestedVolume,
+      String requestedBucket, String keyName, PartKeyInfo partKeyInfo,
+      OMMetadataManager omMetadataManager) {
+
+    String partName;
+    if (OzoneManagerRatisUtils.isBucketFSOptimized()) {
+      String parentPath = OzoneFSUtils.getParent(keyName);
+      StringBuffer keyPath = new StringBuffer(parentPath);
+      keyPath.append(partKeyInfo.getPartName());
+
+      partName = omMetadataManager.getOzoneKey(requestedVolume,
+              requestedBucket, keyPath.toString());
+    } else {
+      partName = partKeyInfo.getPartName();
+    }
+    return partName;
   }
 
   private static String failureMessage(String volume, String bucket,
@@ -366,7 +454,7 @@ public class S3MultipartUploadCompleteRequest extends OMKeyRequest {
         volume + " bucket: " + bucket + " key: " + keyName;
   }
 
-  private void updateCache(OMMetadataManager omMetadataManager,
+  protected void updateCache(OMMetadataManager omMetadataManager,
       String ozoneKey, String multipartKey, OmKeyInfo omKeyInfo,
       long transactionLogIndex) {
     // Update cache.
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3MultipartUploadCompleteRequestV1.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3MultipartUploadCompleteRequestV1.java
new file mode 100644
index 0000000..4ab9ee7
--- /dev/null
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3MultipartUploadCompleteRequestV1.java
@@ -0,0 +1,268 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.om.request.s3.multipart;
+
+import com.google.common.base.Optional;
+import org.apache.commons.codec.digest.DigestUtils;
+import org.apache.hadoop.hdds.utils.db.cache.CacheKey;
+import org.apache.hadoop.hdds.utils.db.cache.CacheValue;
+import org.apache.hadoop.ozone.om.OMMetadataManager;
+import org.apache.hadoop.ozone.om.OzoneManager;
+import org.apache.hadoop.ozone.om.exceptions.OMException;
+import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
+import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
+import org.apache.hadoop.ozone.om.helpers.OmMultipartKeyInfo;
+import org.apache.hadoop.ozone.om.helpers.OzoneFSUtils;
+import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerDoubleBufferHelper;
+import org.apache.hadoop.ozone.om.request.file.OMFileRequest;
+import org.apache.hadoop.ozone.om.request.util.OmResponseUtil;
+import org.apache.hadoop.ozone.om.response.OMClientResponse;
+import org.apache.hadoop.ozone.om.response.s3.multipart.S3MultipartUploadCompleteResponseV1;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.KeyArgs;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.MultipartUploadCompleteRequest;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.MultipartUploadCompleteResponse;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.PartKeyInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.NOT_A_FILE;
+import static org.apache.hadoop.ozone.om.lock.OzoneManagerLock.Resource.BUCKET_LOCK;
+import static org.apache.hadoop.ozone.om.request.file.OMFileRequest.OMDirectoryResult.DIRECTORY_EXISTS;
+
+/**
+ * Handle Multipart upload complete request.
+ */
+public class S3MultipartUploadCompleteRequestV1
+        extends S3MultipartUploadCompleteRequest {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(S3MultipartUploadCompleteRequestV1.class);
+
+  public S3MultipartUploadCompleteRequestV1(OMRequest omRequest) {
+    super(omRequest);
+  }
+
+  @Override
+  @SuppressWarnings("methodlength")
+  public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager,
+      long trxnLogIndex, OzoneManagerDoubleBufferHelper omDoubleBufferHelper) {
+    MultipartUploadCompleteRequest multipartUploadCompleteRequest =
+        getOmRequest().getCompleteMultiPartUploadRequest();
+
+    KeyArgs keyArgs = multipartUploadCompleteRequest.getKeyArgs();
+
+    List<OzoneManagerProtocolProtos.Part> partsList =
+        multipartUploadCompleteRequest.getPartsListList();
+    Map<String, String> auditMap = buildKeyArgsAuditMap(keyArgs);
+
+    String volumeName = keyArgs.getVolumeName();
+    String bucketName = keyArgs.getBucketName();
+    final String requestedVolume = volumeName;
+    final String requestedBucket = bucketName;
+    String keyName = keyArgs.getKeyName();
+    String uploadID = keyArgs.getMultipartUploadID();
+    String dbMultipartKey;
+
+    ozoneManager.getMetrics().incNumCompleteMultipartUploads();
+
+    OMMetadataManager omMetadataManager = ozoneManager.getMetadataManager();
+
+    boolean acquiredLock = false;
+    OMResponse.Builder omResponse = OmResponseUtil.getOMResponseBuilder(
+        getOmRequest());
+    OMClientResponse omClientResponse = null;
+    IOException exception = null;
+    Result result;
+    try {
+      keyArgs = resolveBucketLink(ozoneManager, keyArgs, auditMap);
+      volumeName = keyArgs.getVolumeName();
+      bucketName = keyArgs.getBucketName();
+
+      acquiredLock = omMetadataManager.getLock().acquireWriteLock(BUCKET_LOCK,
+          volumeName, bucketName);
+
+      validateBucketAndVolume(omMetadataManager, volumeName, bucketName);
+
+      String fileName = OzoneFSUtils.getFileName(keyName);
+      Path keyPath = Paths.get(keyName);
+      OMFileRequest.OMPathInfoV1 pathInfoV1 =
+              OMFileRequest.verifyDirectoryKeysInPath(omMetadataManager,
+                      volumeName, bucketName, keyName, keyPath);
+      long parentID = pathInfoV1.getLastKnownParentId();
+
+      dbMultipartKey = omMetadataManager.getMultipartKey(parentID,
+              fileName, uploadID);
+
+      String dbOzoneKey = omMetadataManager.getOzonePathKey(parentID, fileName);
+
+      String ozoneKey = omMetadataManager.getOzoneKey(
+              volumeName, bucketName, keyName);
+
+      OmMultipartKeyInfo multipartKeyInfo =
+              omMetadataManager.getMultipartInfoTable().get(dbMultipartKey);
+
+      // Check for directory exists with same name, if it exists throw error.
+      if (pathInfoV1.getDirectoryResult() == DIRECTORY_EXISTS) {
+        throw new OMException("Can not Complete MPU for file: " + keyName +
+                " as there is already directory in the given path",
+                NOT_A_FILE);
+      }
+
+      if (multipartKeyInfo == null) {
+        throw new OMException(
+            failureMessage(requestedVolume, requestedBucket, keyName),
+            OMException.ResultCodes.NO_SUCH_MULTIPART_UPLOAD_ERROR);
+      }
+      TreeMap<Integer, PartKeyInfo> partKeyInfoMap =
+          multipartKeyInfo.getPartKeyInfoMap();
+
+      if (partsList.size() > 0) {
+        if (partKeyInfoMap.size() == 0) {
+          LOG.error("Complete MultipartUpload failed for key {} , MPU Key has" +
+                  " no parts in OM, parts given to upload are {}", ozoneKey,
+              partsList);
+          throw new OMException(
+              failureMessage(requestedVolume, requestedBucket, keyName),
+              OMException.ResultCodes.INVALID_PART);
+        }
+
+        // First Check for Invalid Part Order.
+        List< Integer > partNumbers = new ArrayList<>();
+        int partsListSize = getPartsListSize(requestedVolume,
+                requestedBucket, keyName, ozoneKey, partNumbers, partsList);
+
+        List<OmKeyLocationInfo> partLocationInfos = new ArrayList<>();
+        long dataSize = getMultipartDataSize(requestedVolume, requestedBucket,
+                keyName, ozoneKey, partKeyInfoMap, partsListSize,
+                partLocationInfos, partsList, ozoneManager);
+
+        // All parts have same replication information. Here getting from last
+        // part.
+        OmKeyInfo omKeyInfo = getOmKeyInfo(ozoneManager, trxnLogIndex, keyArgs,
+                volumeName, bucketName, keyName, dbMultipartKey,
+                omMetadataManager, dbOzoneKey, partKeyInfoMap,
+                partLocationInfos, dataSize);
+
+        //Find all unused parts.
+        List< OmKeyInfo > unUsedParts = new ArrayList<>();
+        for (Map.Entry< Integer, PartKeyInfo > partKeyInfo :
+            partKeyInfoMap.entrySet()) {
+          if (!partNumbers.contains(partKeyInfo.getKey())) {
+            unUsedParts.add(OmKeyInfo
+                .getFromProtobuf(partKeyInfo.getValue().getPartKeyInfo()));
+          }
+        }
+
+        updateCache(omMetadataManager, dbOzoneKey, dbMultipartKey, omKeyInfo,
+            trxnLogIndex);
+
+        omResponse.setCompleteMultiPartUploadResponse(
+            MultipartUploadCompleteResponse.newBuilder()
+                .setVolume(requestedVolume)
+                .setBucket(requestedBucket)
+                .setKey(keyName)
+                .setHash(DigestUtils.sha256Hex(keyName)));
+
+        omClientResponse = new S3MultipartUploadCompleteResponseV1(
+            omResponse.build(), dbMultipartKey, omKeyInfo, unUsedParts);
+
+        result = Result.SUCCESS;
+      } else {
+        throw new OMException(
+            failureMessage(requestedVolume, requestedBucket, keyName) +
+            " because of empty part list",
+            OMException.ResultCodes.INVALID_REQUEST);
+      }
+
+    } catch (IOException ex) {
+      result = Result.FAILURE;
+      exception = ex;
+      omClientResponse = new S3MultipartUploadCompleteResponseV1(
+          createErrorOMResponse(omResponse, exception));
+    } finally {
+      addResponseToDoubleBuffer(trxnLogIndex, omClientResponse,
+          omDoubleBufferHelper);
+      if (acquiredLock) {
+        omMetadataManager.getLock().releaseWriteLock(BUCKET_LOCK,
+            volumeName, bucketName);
+      }
+    }
+
+    logResult(ozoneManager, multipartUploadCompleteRequest, partsList,
+            auditMap, volumeName, bucketName, keyName, exception, result);
+
+    return omClientResponse;
+  }
+
+  protected OmKeyInfo getOmKeyInfoFromKeyTable(String dbOzoneFileKey,
+      String keyName, OMMetadataManager omMetadataManager) throws IOException {
+    return OMFileRequest.getOmKeyInfoFromFileTable(true,
+            omMetadataManager, dbOzoneFileKey, keyName);
+  }
+
+  @Override
+  protected OmKeyInfo getOmKeyInfoFromOpenKeyTable(String dbMultipartKey,
+      String keyName, OMMetadataManager omMetadataManager) throws IOException {
+    return OMFileRequest.getOmKeyInfoFromFileTable(true,
+            omMetadataManager, dbMultipartKey, keyName);
+  }
+
+  @Override
+  protected void updateCache(OMMetadataManager omMetadataManager,
+      String ozoneKey, String multipartKey, OmKeyInfo omKeyInfo,
+      long transactionLogIndex) {
+    // Update cache.
+    // 1. Add key entry to key table.
+    // 2. Delete multipartKey entry from openKeyTable and multipartInfo table.
+    OMFileRequest.addFileTableCacheEntry(omMetadataManager, ozoneKey,
+            omKeyInfo, omKeyInfo.getFileName(), transactionLogIndex);
+
+    omMetadataManager.getOpenKeyTable().addCacheEntry(
+            new CacheKey<>(multipartKey),
+            new CacheValue<>(Optional.absent(), transactionLogIndex));
+    omMetadataManager.getMultipartInfoTable().addCacheEntry(
+            new CacheKey<>(multipartKey),
+            new CacheValue<>(Optional.absent(), transactionLogIndex));
+  }
+
+  protected void updatePrefixFSOInfo(OmKeyInfo dbOpenKeyInfo,
+                                     OmKeyInfo.Builder builder) {
+    // updates parentID and fileName
+    builder.setParentObjectID(dbOpenKeyInfo.getParentObjectID());
+    builder.setFileName(dbOpenKeyInfo.getFileName());
+  }
+
+  private static String failureMessage(String volume, String bucket,
+                                       String keyName) {
+    return "Complete Multipart Upload Failed: volume: " +
+        volume + " bucket: " + bucket + " key: " + keyName;
+  }
+}
+
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/s3/multipart/S3MultipartUploadCompleteResponse.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/s3/multipart/S3MultipartUploadCompleteResponse.java
index 20e398e..f593885 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/s3/multipart/S3MultipartUploadCompleteResponse.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/s3/multipart/S3MultipartUploadCompleteResponse.java
@@ -96,4 +96,15 @@ public class S3MultipartUploadCompleteResponse extends OMClientResponse {
     }
   }
 
-}
\ No newline at end of file
+  protected String getMultipartKey() {
+    return multipartKey;
+  }
+
+  protected OmKeyInfo getOmKeyInfo() {
+    return omKeyInfo;
+  }
+
+  protected List<OmKeyInfo> getPartsUnusedList() {
+    return partsUnusedList;
+  }
+}
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/s3/multipart/S3MultipartUploadCompleteResponse.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/s3/multipart/S3MultipartUploadCompleteResponseV1.java
similarity index 66%
copy from hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/s3/multipart/S3MultipartUploadCompleteResponse.java
copy to hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/s3/multipart/S3MultipartUploadCompleteResponseV1.java
index 20e398e..bb31dce 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/s3/multipart/S3MultipartUploadCompleteResponse.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/s3/multipart/S3MultipartUploadCompleteResponseV1.java
@@ -18,82 +18,74 @@
 
 package org.apache.hadoop.ozone.om.response.s3.multipart;
 
-import java.io.IOException;
-import java.util.List;
-
+import org.apache.hadoop.hdds.utils.db.BatchOperation;
 import org.apache.hadoop.ozone.om.OMMetadataManager;
 import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
 import org.apache.hadoop.ozone.om.helpers.RepeatedOmKeyInfo;
+import org.apache.hadoop.ozone.om.request.file.OMFileRequest;
 import org.apache.hadoop.ozone.om.response.CleanupTableInfo;
-import org.apache.hadoop.ozone.om.response.OMClientResponse;
-import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
-    .OMResponse;
-import org.apache.hadoop.hdds.utils.db.BatchOperation;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse;
 
 import javax.annotation.Nonnull;
+import java.io.IOException;
+import java.util.List;
 
 import static org.apache.hadoop.ozone.om.OmMetadataManagerImpl.DELETED_TABLE;
-import static org.apache.hadoop.ozone.om.OmMetadataManagerImpl.KEY_TABLE;
-import static org.apache.hadoop.ozone.om.OmMetadataManagerImpl.MULTIPARTINFO_TABLE;
-import static org.apache.hadoop.ozone.om.OmMetadataManagerImpl.OPEN_KEY_TABLE;
+import static org.apache.hadoop.ozone.om.OmMetadataManagerImpl.FILE_TABLE;
+import static org.apache.hadoop.ozone.om.OmMetadataManagerImpl.MULTIPARTFILEINFO_TABLE;
+import static org.apache.hadoop.ozone.om.OmMetadataManagerImpl.OPEN_FILE_TABLE;
 
 /**
  * Response for Multipart Upload Complete request.
  */
-@CleanupTableInfo(cleanupTables = {OPEN_KEY_TABLE, KEY_TABLE, DELETED_TABLE,
-    MULTIPARTINFO_TABLE})
-public class S3MultipartUploadCompleteResponse extends OMClientResponse {
-  private String multipartKey;
-  private OmKeyInfo omKeyInfo;
-  private List<OmKeyInfo> partsUnusedList;
+@CleanupTableInfo(cleanupTables = {OPEN_FILE_TABLE, FILE_TABLE, DELETED_TABLE,
+    MULTIPARTFILEINFO_TABLE})
+public class S3MultipartUploadCompleteResponseV1
+        extends S3MultipartUploadCompleteResponse {
 
-  public S3MultipartUploadCompleteResponse(
+  public S3MultipartUploadCompleteResponseV1(
       @Nonnull OMResponse omResponse,
       @Nonnull String multipartKey,
       @Nonnull OmKeyInfo omKeyInfo,
       @Nonnull List<OmKeyInfo> unUsedParts) {
-    super(omResponse);
-    this.partsUnusedList = unUsedParts;
-    this.multipartKey = multipartKey;
-    this.omKeyInfo = omKeyInfo;
+    super(omResponse, multipartKey, omKeyInfo, unUsedParts);
   }
 
   /**
    * For when the request is not successful.
    * For a successful request, the other constructor should be used.
    */
-  public S3MultipartUploadCompleteResponse(@Nonnull OMResponse omResponse) {
+  public S3MultipartUploadCompleteResponseV1(@Nonnull OMResponse omResponse) {
     super(omResponse);
     checkStatusNotOK();
   }
 
+
   @Override
   public void addToDBBatch(OMMetadataManager omMetadataManager,
       BatchOperation batchOperation) throws IOException {
 
     omMetadataManager.getOpenKeyTable().deleteWithBatch(batchOperation,
-        multipartKey);
+            getMultipartKey());
     omMetadataManager.getMultipartInfoTable().deleteWithBatch(batchOperation,
-        multipartKey);
+            getMultipartKey());
 
-    String ozoneKey = omMetadataManager.getOzoneKey(omKeyInfo.getVolumeName(),
-        omKeyInfo.getBucketName(), omKeyInfo.getKeyName());
-    omMetadataManager.getKeyTable().putWithBatch(batchOperation, ozoneKey,
-        omKeyInfo);
+    String dbFileKey = OMFileRequest.addToFileTable(omMetadataManager,
+            batchOperation, getOmKeyInfo());
 
-    if (!partsUnusedList.isEmpty()) {
+    if (!getPartsUnusedList().isEmpty()) {
       // Add unused parts to deleted key table.
       RepeatedOmKeyInfo repeatedOmKeyInfo = omMetadataManager.getDeletedTable()
-          .get(ozoneKey);
+              .get(dbFileKey);
       if (repeatedOmKeyInfo == null) {
-        repeatedOmKeyInfo = new RepeatedOmKeyInfo(partsUnusedList);
+        repeatedOmKeyInfo = new RepeatedOmKeyInfo(getPartsUnusedList());
       } else {
-        repeatedOmKeyInfo.addOmKeyInfo(omKeyInfo);
+        repeatedOmKeyInfo.addOmKeyInfo(getOmKeyInfo());
       }
 
       omMetadataManager.getDeletedTable().putWithBatch(batchOperation,
-          ozoneKey, repeatedOmKeyInfo);
+              dbFileKey, repeatedOmKeyInfo);
     }
   }
+}
 
-}
\ No newline at end of file
diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/s3/multipart/TestS3MultipartRequest.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/s3/multipart/TestS3MultipartRequest.java
index 9f6cff8..16cb4ae 100644
--- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/s3/multipart/TestS3MultipartRequest.java
+++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/s3/multipart/TestS3MultipartRequest.java
@@ -115,7 +115,7 @@ public class TestS3MultipartRequest {
             keyName);
 
     S3InitiateMultipartUploadRequest s3InitiateMultipartUploadRequest =
-        new S3InitiateMultipartUploadRequest(omRequest);
+        getS3InitiateMultipartUploadReq(omRequest);
 
     OMRequest modifiedRequest =
         s3InitiateMultipartUploadRequest.preExecute(ozoneManager);
@@ -204,7 +204,7 @@ public class TestS3MultipartRequest {
             keyName, multipartUploadID, partList);
 
     S3MultipartUploadCompleteRequest s3MultipartUploadCompleteRequest =
-        new S3MultipartUploadCompleteRequest(omRequest);
+            getS3MultipartUploadCompleteReq(omRequest);
 
     OMRequest modifiedRequest =
         s3MultipartUploadCompleteRequest.preExecute(ozoneManager);
@@ -247,6 +247,11 @@ public class TestS3MultipartRequest {
     return modifiedRequest;
   }
 
+  protected S3MultipartUploadCompleteRequest getS3MultipartUploadCompleteReq(
+          OMRequest omRequest) {
+    return new S3MultipartUploadCompleteRequest(omRequest);
+  }
+
   protected S3MultipartUploadCommitPartRequest getS3MultipartUploadCommitReq(
           OMRequest omRequest) {
     return new S3MultipartUploadCommitPartRequest(omRequest);
diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/s3/multipart/TestS3MultipartUploadCompleteRequest.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/s3/multipart/TestS3MultipartUploadCompleteRequest.java
index a04f51f..3d399b1 100644
--- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/s3/multipart/TestS3MultipartUploadCompleteRequest.java
+++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/s3/multipart/TestS3MultipartUploadCompleteRequest.java
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.ozone.om.request.s3.multipart;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.UUID;
@@ -55,7 +56,7 @@ public class TestS3MultipartUploadCompleteRequest
   public void testValidateAndUpdateCacheSuccess() throws Exception {
     String volumeName = UUID.randomUUID().toString();
     String bucketName = UUID.randomUUID().toString();
-    String keyName = UUID.randomUUID().toString();
+    String keyName = getKeyName();
 
     TestOMRequestUtils.addVolumeAndBucketToDB(volumeName, bucketName,
         omMetadataManager);
@@ -64,7 +65,7 @@ public class TestS3MultipartUploadCompleteRequest
         bucketName, keyName);
 
     S3InitiateMultipartUploadRequest s3InitiateMultipartUploadRequest =
-        new S3InitiateMultipartUploadRequest(initiateMPURequest);
+        getS3InitiateMultipartUploadReq(initiateMPURequest);
 
     OMClientResponse omClientResponse =
         s3InitiateMultipartUploadRequest.validateAndUpdateCache(ozoneManager,
@@ -78,27 +79,25 @@ public class TestS3MultipartUploadCompleteRequest
         bucketName, keyName, clientID, multipartUploadID, 1);
 
     S3MultipartUploadCommitPartRequest s3MultipartUploadCommitPartRequest =
-        new S3MultipartUploadCommitPartRequest(commitMultipartRequest);
+        getS3MultipartUploadCommitReq(commitMultipartRequest);
 
     // Add key to open key table.
-    TestOMRequestUtils.addKeyToTable(true, volumeName, bucketName,
-        keyName, clientID, HddsProtos.ReplicationType.RATIS,
-        HddsProtos.ReplicationFactor.ONE, omMetadataManager);
+    addKeyToTable(volumeName, bucketName, keyName, clientID);
 
     s3MultipartUploadCommitPartRequest.validateAndUpdateCache(ozoneManager,
         2L, ozoneManagerDoubleBufferHelper);
 
     List<Part> partList = new ArrayList<>();
 
-    partList.add(Part.newBuilder().setPartName(
-        omMetadataManager.getOzoneKey(volumeName, bucketName, keyName) +
-            clientID).setPartNumber(1).build());
+    String partName = getPartName(volumeName, bucketName, keyName, clientID);
+    partList.add(Part.newBuilder().setPartName(partName).setPartNumber(1)
+            .build());
 
     OMRequest completeMultipartRequest = doPreExecuteCompleteMPU(volumeName,
         bucketName, keyName, multipartUploadID, partList);
 
     S3MultipartUploadCompleteRequest s3MultipartUploadCompleteRequest =
-        new S3MultipartUploadCompleteRequest(completeMultipartRequest);
+        getS3MultipartUploadCompleteReq(completeMultipartRequest);
 
     omClientResponse =
         s3MultipartUploadCompleteRequest.validateAndUpdateCache(ozoneManager,
@@ -107,14 +106,71 @@ public class TestS3MultipartUploadCompleteRequest
     Assert.assertEquals(OzoneManagerProtocolProtos.Status.OK,
         omClientResponse.getOMResponse().getStatus());
 
-    String multipartKey = omMetadataManager.getMultipartKey(volumeName,
-        bucketName, keyName, multipartUploadID);
+    String multipartKey = getMultipartKey(volumeName, bucketName, keyName,
+            multipartUploadID);
 
     Assert.assertNull(omMetadataManager.getOpenKeyTable().get(multipartKey));
     Assert.assertNull(
         omMetadataManager.getMultipartInfoTable().get(multipartKey));
     Assert.assertNotNull(omMetadataManager.getKeyTable().get(
-        omMetadataManager.getOzoneKey(volumeName, bucketName, keyName)));
+            getOzoneDBKey(volumeName, bucketName, keyName)));
+  }
+
+  @Test
+  public void testInvalidPartOrderError() throws Exception {
+    String volumeName = UUID.randomUUID().toString();
+    String bucketName = UUID.randomUUID().toString();
+    String keyName = getKeyName();
+
+    TestOMRequestUtils.addVolumeAndBucketToDB(volumeName, bucketName,
+            omMetadataManager);
+
+    OMRequest initiateMPURequest = doPreExecuteInitiateMPU(volumeName,
+            bucketName, keyName);
+
+    S3InitiateMultipartUploadRequest s3InitiateMultipartUploadRequest =
+            getS3InitiateMultipartUploadReq(initiateMPURequest);
+
+    OMClientResponse omClientResponse =
+            s3InitiateMultipartUploadRequest.validateAndUpdateCache(
+                    ozoneManager, 1L, ozoneManagerDoubleBufferHelper);
+
+    long clientID = Time.now();
+    String multipartUploadID = omClientResponse.getOMResponse()
+            .getInitiateMultiPartUploadResponse().getMultipartUploadID();
+
+    OMRequest commitMultipartRequest = doPreExecuteCommitMPU(volumeName,
+            bucketName, keyName, clientID, multipartUploadID, 1);
+
+    S3MultipartUploadCommitPartRequest s3MultipartUploadCommitPartRequest =
+            getS3MultipartUploadCommitReq(commitMultipartRequest);
+
+    // Add key to open key table.
+    addKeyToTable(volumeName, bucketName, keyName, clientID);
+
+    s3MultipartUploadCommitPartRequest.validateAndUpdateCache(ozoneManager,
+            2L, ozoneManagerDoubleBufferHelper);
+
+    List<Part> partList = new ArrayList<>();
+
+    String partName = getPartName(volumeName, bucketName, keyName, clientID);
+    partList.add(Part.newBuilder().setPartName(partName).setPartNumber(23)
+            .build());
+    partList.add(Part.newBuilder().setPartName(partName).setPartNumber(1)
+            .build());
+
+    OMRequest completeMultipartRequest = doPreExecuteCompleteMPU(volumeName,
+            bucketName, keyName, multipartUploadID, partList);
+
+    S3MultipartUploadCompleteRequest s3MultipartUploadCompleteRequest =
+            getS3MultipartUploadCompleteReq(completeMultipartRequest);
+
+    omClientResponse =
+            s3MultipartUploadCompleteRequest.validateAndUpdateCache(
+                    ozoneManager, 3L, ozoneManagerDoubleBufferHelper);
+
+    Assert.assertEquals(OzoneManagerProtocolProtos.Status.INVALID_PART_ORDER,
+            omClientResponse.getOMResponse().getStatus());
   }
 
   @Test
@@ -129,7 +185,7 @@ public class TestS3MultipartUploadCompleteRequest
         bucketName, keyName, UUID.randomUUID().toString(), partList);
 
     S3MultipartUploadCompleteRequest s3MultipartUploadCompleteRequest =
-        new S3MultipartUploadCompleteRequest(completeMultipartRequest);
+        getS3MultipartUploadCompleteReq(completeMultipartRequest);
 
     OMClientResponse omClientResponse =
         s3MultipartUploadCompleteRequest.validateAndUpdateCache(ozoneManager,
@@ -153,7 +209,7 @@ public class TestS3MultipartUploadCompleteRequest
         bucketName, keyName, UUID.randomUUID().toString(), partList);
 
     S3MultipartUploadCompleteRequest s3MultipartUploadCompleteRequest =
-        new S3MultipartUploadCompleteRequest(completeMultipartRequest);
+            getS3MultipartUploadCompleteReq(completeMultipartRequest);
 
     OMClientResponse omClientResponse =
         s3MultipartUploadCompleteRequest.validateAndUpdateCache(ozoneManager,
@@ -180,7 +236,7 @@ public class TestS3MultipartUploadCompleteRequest
 
     // Doing  complete multipart upload request with out initiate.
     S3MultipartUploadCompleteRequest s3MultipartUploadCompleteRequest =
-        new S3MultipartUploadCompleteRequest(completeMultipartRequest);
+            getS3MultipartUploadCompleteReq(completeMultipartRequest);
 
     OMClientResponse omClientResponse =
         s3MultipartUploadCompleteRequest.validateAndUpdateCache(ozoneManager,
@@ -191,5 +247,35 @@ public class TestS3MultipartUploadCompleteRequest
         omClientResponse.getOMResponse().getStatus());
 
   }
+
+  protected void addKeyToTable(String volumeName, String bucketName,
+                             String keyName, long clientID) throws Exception {
+    TestOMRequestUtils.addKeyToTable(true, volumeName, bucketName,
+            keyName, clientID, HddsProtos.ReplicationType.RATIS,
+            HddsProtos.ReplicationFactor.ONE, omMetadataManager);
+  }
+
+  protected String getMultipartKey(String volumeName, String bucketName,
+      String keyName, String multipartUploadID) throws IOException {
+    return omMetadataManager.getMultipartKey(volumeName,
+            bucketName, keyName, multipartUploadID);
+  }
+
+  private String getPartName(String volumeName, String bucketName,
+      String keyName, long clientID) throws IOException {
+
+    String dbOzoneKey = omMetadataManager.getOzoneKey(volumeName, bucketName,
+            keyName);
+    return dbOzoneKey + clientID;
+  }
+
+  protected String getOzoneDBKey(String volumeName, String bucketName,
+                                 String keyName) throws IOException {
+    return omMetadataManager.getOzoneKey(volumeName, bucketName, keyName);
+  }
+
+  protected String getKeyName() {
+    return UUID.randomUUID().toString();
+  }
 }
 
diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/s3/multipart/TestS3MultipartUploadCompleteRequestV1.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/s3/multipart/TestS3MultipartUploadCompleteRequestV1.java
new file mode 100644
index 0000000..cd5051f
--- /dev/null
+++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/s3/multipart/TestS3MultipartUploadCompleteRequestV1.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.om.request.s3.multipart;
+
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
+import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
+import org.apache.hadoop.ozone.om.helpers.OzoneFSUtils;
+import org.apache.hadoop.ozone.om.helpers.OzoneFileStatus;
+import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerRatisUtils;
+import org.apache.hadoop.ozone.om.request.TestOMRequestUtils;
+import org.apache.hadoop.ozone.om.request.file.OMFileRequest;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
+import org.apache.hadoop.util.Time;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+
+import java.io.IOException;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.Iterator;
+import java.util.UUID;
+
+/**
+ * Tests S3 Multipart Upload Complete request.
+ */
+public class TestS3MultipartUploadCompleteRequestV1
+    extends TestS3MultipartUploadCompleteRequest {
+
+  @BeforeClass
+  public static void init() {
+    OzoneManagerRatisUtils.setBucketFSOptimized(true);
+  }
+
+  protected String getKeyName() {
+    String parentDir = UUID.randomUUID().toString() + "/a/b/c";
+    String fileName = "file1";
+    String keyName = parentDir + OzoneConsts.OM_KEY_PREFIX + fileName;
+    return keyName;
+  }
+
+  protected void addKeyToTable(String volumeName, String bucketName,
+      String keyName, long clientID) throws Exception {
+    // need to initialize parentID
+    String parentDir = OzoneFSUtils.getParentDir(keyName);
+    Assert.assertNotEquals("Parent doesn't exists!", parentDir, keyName);
+
+    // add parentDir to dirTable
+    long parentID = getParentID(volumeName, bucketName, keyName);
+    long txnId = 50;
+    long objectId = parentID + 1;
+
+    OmKeyInfo omKeyInfoV1 =
+            TestOMRequestUtils.createOmKeyInfo(volumeName, bucketName, keyName,
+                    HddsProtos.ReplicationType.RATIS,
+                    HddsProtos.ReplicationFactor.ONE, objectId, parentID, txnId,
+                    Time.now());
+
+    // add key to openFileTable
+    String fileName = OzoneFSUtils.getFileName(keyName);
+    omKeyInfoV1.setKeyName(fileName);
+    TestOMRequestUtils.addFileToKeyTable(true, false,
+            fileName, omKeyInfoV1, clientID, omKeyInfoV1.getObjectID(),
+            omMetadataManager);
+  }
+
+  protected String getMultipartKey(String volumeName, String bucketName,
+      String keyName, String multipartUploadID) throws IOException {
+    OzoneFileStatus keyStatus = OMFileRequest.getOMKeyInfoIfExists(
+            omMetadataManager, volumeName,
+            bucketName, keyName, 0);
+
+    Assert.assertNotNull("key not found in DB!", keyStatus);
+
+    return omMetadataManager.getMultipartKey(keyStatus.getKeyInfo()
+                    .getParentObjectID(), keyStatus.getTrimmedName(),
+            multipartUploadID);
+  }
+
+  private long getParentID(String volumeName, String bucketName,
+                           String keyName) throws IOException {
+    Path keyPath = Paths.get(keyName);
+    Iterator<Path> elements = keyPath.iterator();
+    String bucketKey = omMetadataManager.getBucketKey(volumeName, bucketName);
+    OmBucketInfo omBucketInfo =
+            omMetadataManager.getBucketTable().get(bucketKey);
+
+    return OMFileRequest.getParentID(omBucketInfo.getObjectID(),
+            elements, keyName, omMetadataManager);
+  }
+
+  protected String getOzoneDBKey(String volumeName, String bucketName,
+                                 String keyName) throws IOException {
+    long parentID = getParentID(volumeName, bucketName, keyName);
+    String fileName = OzoneFSUtils.getFileName(keyName);
+    return omMetadataManager.getOzonePathKey(parentID, fileName);
+  }
+
+  protected S3MultipartUploadCompleteRequest getS3MultipartUploadCompleteReq(
+          OMRequest omRequest) {
+    return new S3MultipartUploadCompleteRequestV1(omRequest);
+  }
+
+  protected S3MultipartUploadCommitPartRequest getS3MultipartUploadCommitReq(
+          OMRequest omRequest) {
+    return new S3MultipartUploadCommitPartRequestV1(omRequest);
+  }
+
+  protected S3InitiateMultipartUploadRequest getS3InitiateMultipartUploadReq(
+          OMRequest initiateMPURequest) {
+    return new S3InitiateMultipartUploadRequestV1(initiateMPURequest);
+  }
+
+}
+
diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/s3/multipart/TestS3MultipartResponse.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/s3/multipart/TestS3MultipartResponse.java
index 106ae61..6f4d6fa 100644
--- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/s3/multipart/TestS3MultipartResponse.java
+++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/s3/multipart/TestS3MultipartResponse.java
@@ -268,6 +268,27 @@ public class TestS3MultipartResponse {
             openPartKeyInfoToBeDeleted, isRatisEnabled, omBucketInfo);
   }
 
+  @SuppressWarnings("checkstyle:ParameterNumber")
+  public S3MultipartUploadCompleteResponse createS3CompleteMPUResponseV1(
+          String volumeName, String bucketName, long parentID, String keyName,
+          String multipartUploadID, OmKeyInfo omKeyInfo,
+          OzoneManagerProtocolProtos.Status status,
+          List<OmKeyInfo> unUsedParts) {
+
+    String multipartKey = getMultipartKey(parentID, keyName, multipartUploadID);
+
+    OMResponse omResponse = OMResponse.newBuilder()
+            .setCmdType(OzoneManagerProtocolProtos.Type.CompleteMultiPartUpload)
+            .setStatus(status).setSuccess(true)
+            .setCompleteMultiPartUploadResponse(
+                    OzoneManagerProtocolProtos.MultipartUploadCompleteResponse
+                            .newBuilder().setBucket(bucketName)
+                            .setVolume(volumeName).setKey(keyName)).build();
+
+    return new S3MultipartUploadCompleteResponseV1(omResponse, multipartKey,
+            omKeyInfo, unUsedParts);
+  }
+
   private String getMultipartKey(long parentID, String keyName,
                                  String multipartUploadID) {
     String fileName = OzoneFSUtils.getFileName(keyName);
diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/s3/multipart/TestS3MultipartUploadCompleteResponseV1.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/s3/multipart/TestS3MultipartUploadCompleteResponseV1.java
new file mode 100644
index 0000000..2683273
--- /dev/null
+++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/s3/multipart/TestS3MultipartUploadCompleteResponseV1.java
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.om.response.s3.multipart;
+
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
+import org.apache.hadoop.ozone.om.helpers.OmMultipartKeyInfo;
+import org.apache.hadoop.ozone.om.helpers.OzoneFSUtils;
+import org.apache.hadoop.ozone.om.helpers.RepeatedOmKeyInfo;
+import org.apache.hadoop.ozone.om.request.TestOMRequestUtils;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.PartKeyInfo;
+import org.apache.hadoop.util.Time;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+
+/**
+ * Test multipart upload complete response.
+ */
+public class TestS3MultipartUploadCompleteResponseV1
+    extends TestS3MultipartResponse {
+
+  private String dirName = "a/b/c/";
+
+  private long parentID;
+
+  @Test
+  public void testAddDBToBatch() throws Exception {
+    String volumeName = UUID.randomUUID().toString();
+    String bucketName = UUID.randomUUID().toString();
+    String keyName = getKeyName();
+    String multipartUploadID = UUID.randomUUID().toString();
+
+    TestOMRequestUtils.addVolumeAndBucketToDB(volumeName, bucketName,
+            omMetadataManager);
+
+    long txnId = 50;
+    long objectId = parentID + 1;
+    String fileName = OzoneFSUtils.getFileName(keyName);
+    String dbMultipartKey = omMetadataManager.getMultipartKey(parentID,
+            fileName, multipartUploadID);
+    long clientId = Time.now();
+    String dbOpenKey = omMetadataManager.getOpenFileName(parentID, fileName,
+            clientId);
+    String dbKey = omMetadataManager.getOzonePathKey(parentID, fileName);
+    OmKeyInfo omKeyInfoV1 =
+            TestOMRequestUtils.createOmKeyInfo(volumeName, bucketName, keyName,
+                    HddsProtos.ReplicationType.RATIS,
+                    HddsProtos.ReplicationFactor.ONE, objectId, parentID, txnId,
+                    Time.now());
+
+    // add key to openFileTable
+    omKeyInfoV1.setKeyName(fileName);
+    TestOMRequestUtils.addFileToKeyTable(true, false,
+            fileName, omKeyInfoV1, clientId, omKeyInfoV1.getObjectID(),
+            omMetadataManager);
+
+    addS3MultipartUploadCommitPartResponseV1(volumeName, bucketName, keyName,
+            multipartUploadID, dbOpenKey);
+
+    List<OmKeyInfo> unUsedParts = new ArrayList<>();
+    S3MultipartUploadCompleteResponse s3MultipartUploadCompleteResponse =
+            createS3CompleteMPUResponseV1(volumeName, bucketName, parentID,
+                    keyName, multipartUploadID, omKeyInfoV1,
+                OzoneManagerProtocolProtos.Status.OK, unUsedParts);
+
+    s3MultipartUploadCompleteResponse.addToDBBatch(omMetadataManager,
+        batchOperation);
+
+    omMetadataManager.getStore().commitBatchOperation(batchOperation);
+
+    Assert.assertNotNull(omMetadataManager.getKeyTable().get(dbKey));
+    Assert.assertNull(
+        omMetadataManager.getMultipartInfoTable().get(dbMultipartKey));
+    Assert.assertNull(
+            omMetadataManager.getOpenKeyTable().get(dbMultipartKey));
+
+    // As no parts are created, so no entries should be there in delete table.
+    Assert.assertEquals(0, omMetadataManager.countRowsInTable(
+            omMetadataManager.getDeletedTable()));
+  }
+
+  @Test
+  public void testAddDBToBatchWithParts() throws Exception {
+
+    String volumeName = UUID.randomUUID().toString();
+    String bucketName = UUID.randomUUID().toString();
+    String keyName = getKeyName();
+
+    TestOMRequestUtils.addVolumeAndBucketToDB(volumeName, bucketName,
+            omMetadataManager);
+    createParentPath(volumeName, bucketName);
+
+    String multipartUploadID = UUID.randomUUID().toString();
+
+    int deleteEntryCount = 0;
+
+    String fileName = OzoneFSUtils.getFileName(keyName);
+    String dbMultipartKey = omMetadataManager.getMultipartKey(parentID,
+            fileName, multipartUploadID);
+
+    S3InitiateMultipartUploadResponse s3InitiateMultipartUploadResponseV1 =
+            addS3InitiateMultipartUpload(volumeName, bucketName, keyName,
+                    multipartUploadID);
+
+    // Add some dummy parts for testing.
+    // Not added any key locations, as this just test is to see entries are
+    // adding to delete table or not.
+    OmMultipartKeyInfo omMultipartKeyInfo =
+            s3InitiateMultipartUploadResponseV1.getOmMultipartKeyInfo();
+
+    OmKeyInfo omKeyInfoV1 = commitS3MultipartUpload(volumeName, bucketName,
+            keyName, multipartUploadID, fileName, dbMultipartKey,
+            omMultipartKeyInfo);
+    // After commits, it adds an entry to the deleted table.
+    deleteEntryCount++;
+
+    OmKeyInfo omKeyInfo =
+            TestOMRequestUtils.createOmKeyInfo(volumeName, bucketName, keyName,
+                    HddsProtos.ReplicationType.RATIS,
+                    HddsProtos.ReplicationFactor.ONE,
+                    parentID + 10,
+                    parentID, 100, Time.now());
+    List<OmKeyInfo> unUsedParts = new ArrayList<>();
+    unUsedParts.add(omKeyInfo);
+    S3MultipartUploadCompleteResponse s3MultipartUploadCompleteResponse =
+            createS3CompleteMPUResponseV1(volumeName, bucketName, parentID,
+                    keyName, multipartUploadID, omKeyInfoV1,
+                    OzoneManagerProtocolProtos.Status.OK, unUsedParts);
+
+    s3MultipartUploadCompleteResponse.addToDBBatch(omMetadataManager,
+            batchOperation);
+
+    omMetadataManager.getStore().commitBatchOperation(batchOperation);
+    String dbKey = omMetadataManager.getOzonePathKey(parentID,
+          omKeyInfoV1.getFileName());
+    Assert.assertNotNull(omMetadataManager.getKeyTable().get(dbKey));
+    Assert.assertNull(
+            omMetadataManager.getMultipartInfoTable().get(dbMultipartKey));
+    Assert.assertNull(
+            omMetadataManager.getOpenKeyTable().get(dbMultipartKey));
+
+    // As 1 unused parts exists, so 1 unused entry should be there in delete
+    // table.
+    deleteEntryCount++;
+    Assert.assertEquals(deleteEntryCount, omMetadataManager.countRowsInTable(
+            omMetadataManager.getDeletedTable()));
+  }
+
+  private OmKeyInfo commitS3MultipartUpload(String volumeName,
+      String bucketName, String keyName, String multipartUploadID,
+      String fileName, String multipartKey,
+      OmMultipartKeyInfo omMultipartKeyInfo) throws IOException {
+
+    PartKeyInfo part1 = createPartKeyInfoV1(volumeName, bucketName, parentID,
+        fileName, 1);
+
+    addPart(1, part1, omMultipartKeyInfo);
+
+    long clientId = Time.now();
+    String openKey = omMetadataManager.getOpenFileName(parentID, fileName,
+            clientId);
+
+    S3MultipartUploadCommitPartResponse s3MultipartUploadCommitPartResponse =
+            createS3CommitMPUResponseV1(volumeName, bucketName, parentID,
+                    keyName, multipartUploadID,
+                    omMultipartKeyInfo.getPartKeyInfo(1),
+                    omMultipartKeyInfo,
+                    OzoneManagerProtocolProtos.Status.OK,  openKey);
+
+    s3MultipartUploadCommitPartResponse.checkAndUpdateDB(omMetadataManager,
+            batchOperation);
+
+    Assert.assertNull(omMetadataManager.getOpenKeyTable().get(multipartKey));
+    Assert.assertNull(
+        omMetadataManager.getMultipartInfoTable().get(multipartKey));
+
+    omMetadataManager.getStore().commitBatchOperation(batchOperation);
+
+    // As 1 parts are created, so 1 entry should be there in delete table.
+    Assert.assertEquals(1, omMetadataManager.countRowsInTable(
+        omMetadataManager.getDeletedTable()));
+
+    String part1DeletedKeyName =
+        omMultipartKeyInfo.getPartKeyInfo(1).getPartName();
+
+    Assert.assertNotNull(omMetadataManager.getDeletedTable().get(
+        part1DeletedKeyName));
+
+    RepeatedOmKeyInfo ro =
+        omMetadataManager.getDeletedTable().get(part1DeletedKeyName);
+    OmKeyInfo omPartKeyInfo = OmKeyInfo.getFromProtobuf(part1.getPartKeyInfo());
+    Assert.assertEquals(omPartKeyInfo, ro.getOmKeyInfoList().get(0));
+
+    return omPartKeyInfo;
+  }
+
+  private S3InitiateMultipartUploadResponse addS3InitiateMultipartUpload(
+          String volumeName, String bucketName, String keyName,
+          String multipartUploadID) throws IOException {
+
+    S3InitiateMultipartUploadResponse s3InitiateMultipartUploadResponseV1 =
+            createS3InitiateMPUResponseV1(volumeName, bucketName, parentID,
+                    keyName, multipartUploadID, new ArrayList<>());
+
+    s3InitiateMultipartUploadResponseV1.addToDBBatch(omMetadataManager,
+            batchOperation);
+
+    return s3InitiateMultipartUploadResponseV1;
+  }
+
+  private String getKeyName() {
+    return dirName + UUID.randomUUID().toString();
+  }
+
+  private void createParentPath(String volumeName, String bucketName)
+      throws Exception {
+    // Create parent dirs for the path
+    parentID = TestOMRequestUtils.addParentsToDirTable(volumeName, bucketName,
+            dirName, omMetadataManager);
+  }
+
+  private void addS3MultipartUploadCommitPartResponseV1(String volumeName,
+      String bucketName, String keyName, String multipartUploadID,
+      String openKey) throws IOException {
+    S3MultipartUploadCommitPartResponse s3MultipartUploadCommitPartResponse =
+            createS3CommitMPUResponseV1(volumeName, bucketName, parentID,
+                    keyName, multipartUploadID, null, null,
+                    OzoneManagerProtocolProtos.Status.OK, openKey);
+
+    s3MultipartUploadCommitPartResponse.addToDBBatch(omMetadataManager,
+            batchOperation);
+
+    omMetadataManager.getStore().commitBatchOperation(batchOperation);
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org