You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by ra...@apache.org on 2021/02/18 03:13:41 UTC
[ozone] 18/18: HDDS-4813. [FSO]S3Multipart: Implement
UploadCompleteRequest (#1923)
This is an automated email from the ASF dual-hosted git repository.
rakeshr pushed a commit to branch HDDS-2939
in repository https://gitbox.apache.org/repos/asf/ozone.git
commit 1de47385b16d9d08dd2082fe453ae3d1f07e1bd4
Author: Rakesh Radhakrishnan <ra...@apache.org>
AuthorDate: Wed Feb 17 20:50:05 2021 +0530
HDDS-4813. [FSO]S3Multipart: Implement UploadCompleteRequest (#1923)
---
.../rpc/TestOzoneClientMultipartUploadV1.java | 274 ++++++++++++++++--
.../om/ratis/utils/OzoneManagerRatisUtils.java | 4 +
.../ozone/om/request/file/OMFileRequest.java | 4 +-
.../S3MultipartUploadCommitPartRequestV1.java | 37 ++-
.../S3MultipartUploadCompleteRequest.java | 306 +++++++++++++--------
.../S3MultipartUploadCompleteRequestV1.java | 268 ++++++++++++++++++
.../S3MultipartUploadCompleteResponse.java | 13 +-
...va => S3MultipartUploadCompleteResponseV1.java} | 60 ++--
.../s3/multipart/TestS3MultipartRequest.java | 9 +-
.../TestS3MultipartUploadCompleteRequest.java | 118 ++++++--
.../TestS3MultipartUploadCompleteRequestV1.java | 132 +++++++++
.../s3/multipart/TestS3MultipartResponse.java | 21 ++
.../TestS3MultipartUploadCompleteResponseV1.java | 257 +++++++++++++++++
13 files changed, 1307 insertions(+), 196 deletions(-)
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneClientMultipartUploadV1.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneClientMultipartUploadV1.java
index af241c5..1ab2cc3 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneClientMultipartUploadV1.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneClientMultipartUploadV1.java
@@ -17,24 +17,29 @@
package org.apache.hadoop.ozone.client.rpc;
+import org.apache.commons.lang3.RandomUtils;
+import org.apache.hadoop.hdds.client.ReplicationFactor;
import org.apache.hadoop.hdds.client.ReplicationType;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.ozone.MiniOzoneCluster;
+import org.apache.hadoop.ozone.OzoneTestUtils;
import org.apache.hadoop.ozone.client.ObjectStore;
import org.apache.hadoop.ozone.client.OzoneBucket;
import org.apache.hadoop.ozone.client.OzoneClient;
import org.apache.hadoop.ozone.client.OzoneClientFactory;
import org.apache.hadoop.ozone.client.OzoneVolume;
+import org.apache.hadoop.ozone.client.io.OzoneInputStream;
import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
import org.apache.hadoop.ozone.om.OMConfigKeys;
+import org.apache.hadoop.ozone.om.exceptions.OMException;
import org.apache.hadoop.ozone.om.helpers.OmMultipartCommitUploadPartInfo;
import org.apache.hadoop.ozone.om.helpers.OmMultipartInfo;
+import static java.nio.charset.StandardCharsets.UTF_8;
import static org.apache.hadoop.hdds.StringUtils.string2Bytes;
import static org.apache.hadoop.hdds.client.ReplicationFactor.THREE;
-import static org.junit.Assert.assertNotEquals;
-import static org.junit.Assert.assertNotNull;
+import org.apache.hadoop.ozone.om.helpers.OmMultipartUploadCompleteInfo;
import org.apache.hadoop.ozone.om.request.TestOMRequestUtils;
import org.junit.AfterClass;
import org.junit.Assert;
@@ -44,10 +49,15 @@ import org.junit.Test;
import org.junit.rules.Timeout;
import java.io.IOException;
+import java.util.Arrays;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.TreeMap;
import java.util.UUID;
import static org.apache.hadoop.hdds.client.ReplicationFactor.ONE;
import static org.apache.hadoop.hdds.client.ReplicationType.STAND_ALONE;
+import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.NO_SUCH_MULTIPART_UPLOAD_ERROR;
/**
* This test verifies all the S3 multipart client apis - layout version V1.
@@ -133,24 +143,24 @@ public class TestOzoneClientMultipartUploadV1 {
OmMultipartInfo multipartInfo = bucket.initiateMultipartUpload(keyName,
STAND_ALONE, ONE);
- assertNotNull(multipartInfo);
+ Assert.assertNotNull(multipartInfo);
String uploadID = multipartInfo.getUploadID();
Assert.assertEquals(volumeName, multipartInfo.getVolumeName());
Assert.assertEquals(bucketName, multipartInfo.getBucketName());
Assert.assertEquals(keyName, multipartInfo.getKeyName());
- assertNotNull(multipartInfo.getUploadID());
+ Assert.assertNotNull(multipartInfo.getUploadID());
// Call initiate multipart upload for the same key again, this should
// generate a new uploadID.
multipartInfo = bucket.initiateMultipartUpload(keyName,
STAND_ALONE, ONE);
- assertNotNull(multipartInfo);
+ Assert.assertNotNull(multipartInfo);
Assert.assertEquals(volumeName, multipartInfo.getVolumeName());
Assert.assertEquals(bucketName, multipartInfo.getBucketName());
Assert.assertEquals(keyName, multipartInfo.getKeyName());
- assertNotEquals(multipartInfo.getUploadID(), uploadID);
- assertNotNull(multipartInfo.getUploadID());
+ Assert.assertNotEquals(multipartInfo.getUploadID(), uploadID);
+ Assert.assertNotNull(multipartInfo.getUploadID());
}
@Test
@@ -166,23 +176,23 @@ public class TestOzoneClientMultipartUploadV1 {
OzoneBucket bucket = volume.getBucket(bucketName);
OmMultipartInfo multipartInfo = bucket.initiateMultipartUpload(keyName);
- assertNotNull(multipartInfo);
+ Assert.assertNotNull(multipartInfo);
String uploadID = multipartInfo.getUploadID();
Assert.assertEquals(volumeName, multipartInfo.getVolumeName());
Assert.assertEquals(bucketName, multipartInfo.getBucketName());
Assert.assertEquals(keyName, multipartInfo.getKeyName());
- assertNotNull(multipartInfo.getUploadID());
+ Assert.assertNotNull(multipartInfo.getUploadID());
// Call initiate multipart upload for the same key again, this should
// generate a new uploadID.
multipartInfo = bucket.initiateMultipartUpload(keyName);
- assertNotNull(multipartInfo);
+ Assert.assertNotNull(multipartInfo);
Assert.assertEquals(volumeName, multipartInfo.getVolumeName());
Assert.assertEquals(bucketName, multipartInfo.getBucketName());
Assert.assertEquals(keyName, multipartInfo.getKeyName());
- assertNotEquals(multipartInfo.getUploadID(), uploadID);
- assertNotNull(multipartInfo.getUploadID());
+ Assert.assertNotEquals(multipartInfo.getUploadID(), uploadID);
+ Assert.assertNotNull(multipartInfo.getUploadID());
}
@Test
@@ -199,12 +209,12 @@ public class TestOzoneClientMultipartUploadV1 {
OmMultipartInfo multipartInfo = bucket.initiateMultipartUpload(keyName,
STAND_ALONE, ONE);
- assertNotNull(multipartInfo);
+ Assert.assertNotNull(multipartInfo);
String uploadID = multipartInfo.getUploadID();
Assert.assertEquals(volumeName, multipartInfo.getVolumeName());
Assert.assertEquals(bucketName, multipartInfo.getBucketName());
Assert.assertEquals(keyName, multipartInfo.getKeyName());
- assertNotNull(multipartInfo.getUploadID());
+ Assert.assertNotNull(multipartInfo.getUploadID());
OzoneOutputStream ozoneOutputStream = bucket.createMultipartKey(keyName,
sampleData.length(), 1, uploadID);
@@ -214,8 +224,8 @@ public class TestOzoneClientMultipartUploadV1 {
OmMultipartCommitUploadPartInfo commitUploadPartInfo = ozoneOutputStream
.getCommitUploadPartInfo();
- assertNotNull(commitUploadPartInfo);
- assertNotNull(commitUploadPartInfo.getPartName());
+ Assert.assertNotNull(commitUploadPartInfo);
+ Assert.assertNotNull(commitUploadPartInfo.getPartName());
}
@Test
@@ -233,12 +243,12 @@ public class TestOzoneClientMultipartUploadV1 {
OmMultipartInfo multipartInfo = bucket.initiateMultipartUpload(keyName,
ReplicationType.RATIS, THREE);
- assertNotNull(multipartInfo);
+ Assert.assertNotNull(multipartInfo);
String uploadID = multipartInfo.getUploadID();
Assert.assertEquals(volumeName, multipartInfo.getVolumeName());
Assert.assertEquals(bucketName, multipartInfo.getBucketName());
Assert.assertEquals(keyName, multipartInfo.getKeyName());
- assertNotNull(multipartInfo.getUploadID());
+ Assert.assertNotNull(multipartInfo.getUploadID());
int partNumber = 1;
@@ -250,9 +260,9 @@ public class TestOzoneClientMultipartUploadV1 {
OmMultipartCommitUploadPartInfo commitUploadPartInfo = ozoneOutputStream
.getCommitUploadPartInfo();
- assertNotNull(commitUploadPartInfo);
+ Assert.assertNotNull(commitUploadPartInfo);
String partName = commitUploadPartInfo.getPartName();
- assertNotNull(commitUploadPartInfo.getPartName());
+ Assert.assertNotNull(commitUploadPartInfo.getPartName());
//Overwrite the part by creating part key with same part number.
sampleData = "sample Data Changed";
@@ -264,12 +274,230 @@ public class TestOzoneClientMultipartUploadV1 {
commitUploadPartInfo = ozoneOutputStream
.getCommitUploadPartInfo();
- assertNotNull(commitUploadPartInfo);
- assertNotNull(commitUploadPartInfo.getPartName());
+ Assert.assertNotNull(commitUploadPartInfo);
+ Assert.assertNotNull(commitUploadPartInfo.getPartName());
// PartName should be different from old part Name.
- assertNotEquals("Part names should be different", partName,
+ Assert.assertNotEquals("Part names should be different", partName,
commitUploadPartInfo.getPartName());
}
+ @Test
+ public void testMultipartUploadWithPartsLessThanMinSize() throws Exception {
+ String volumeName = UUID.randomUUID().toString();
+ String bucketName = UUID.randomUUID().toString();
+ String keyName = UUID.randomUUID().toString();
+
+ store.createVolume(volumeName);
+ OzoneVolume volume = store.getVolume(volumeName);
+ volume.createBucket(bucketName);
+ OzoneBucket bucket = volume.getBucket(bucketName);
+
+ // Initiate multipart upload
+ String uploadID = initiateMultipartUpload(bucket, keyName, STAND_ALONE,
+ ONE);
+
+ // Upload Parts
+ Map<Integer, String> partsMap = new TreeMap<>();
+ // Uploading part 1 with less than min size
+ String partName = uploadPart(bucket, keyName, uploadID, 1,
+ "data".getBytes(UTF_8));
+ partsMap.put(1, partName);
+
+ partName = uploadPart(bucket, keyName, uploadID, 2,
+ "data".getBytes(UTF_8));
+ partsMap.put(2, partName);
+
+ // Complete multipart upload
+ OzoneTestUtils.expectOmException(OMException.ResultCodes.ENTITY_TOO_SMALL,
+ () -> completeMultipartUpload(bucket, keyName, uploadID, partsMap));
+ }
+
+ @Test
+ public void testMultipartUploadWithPartsMisMatchWithListSizeDifferent()
+ throws Exception {
+ String volumeName = UUID.randomUUID().toString();
+ String bucketName = UUID.randomUUID().toString();
+ String keyName = UUID.randomUUID().toString();
+
+ store.createVolume(volumeName);
+ OzoneVolume volume = store.getVolume(volumeName);
+ volume.createBucket(bucketName);
+ OzoneBucket bucket = volume.getBucket(bucketName);
+
+ String uploadID = initiateMultipartUpload(bucket, keyName, STAND_ALONE,
+ ONE);
+
+ // We have not uploaded any parts, but passing some list it should throw
+ // error.
+ TreeMap<Integer, String> partsMap = new TreeMap<>();
+ partsMap.put(1, UUID.randomUUID().toString());
+
+ OzoneTestUtils.expectOmException(OMException.ResultCodes.INVALID_PART,
+ () -> completeMultipartUpload(bucket, keyName, uploadID, partsMap));
+ }
+
+ @Test
+ public void testMultipartUploadWithPartsMisMatchWithIncorrectPartName()
+ throws Exception {
+ String volumeName = UUID.randomUUID().toString();
+ String bucketName = UUID.randomUUID().toString();
+ String keyName = UUID.randomUUID().toString();
+
+ store.createVolume(volumeName);
+ OzoneVolume volume = store.getVolume(volumeName);
+ volume.createBucket(bucketName);
+ OzoneBucket bucket = volume.getBucket(bucketName);
+
+ String uploadID = initiateMultipartUpload(bucket, keyName, STAND_ALONE,
+ ONE);
+
+ uploadPart(bucket, keyName, uploadID, 1, "data".getBytes(UTF_8));
+
+ // passing with an incorrect part name, should throw INVALID_PART error.
+ TreeMap<Integer, String> partsMap = new TreeMap<>();
+ partsMap.put(1, UUID.randomUUID().toString());
+
+ OzoneTestUtils.expectOmException(OMException.ResultCodes.INVALID_PART,
+ () -> completeMultipartUpload(bucket, keyName, uploadID, partsMap));
+ }
+
+ @Test
+ public void testMultipartUploadWithMissingParts() throws Exception {
+ String volumeName = UUID.randomUUID().toString();
+ String bucketName = UUID.randomUUID().toString();
+ String keyName = UUID.randomUUID().toString();
+
+ store.createVolume(volumeName);
+ OzoneVolume volume = store.getVolume(volumeName);
+ volume.createBucket(bucketName);
+ OzoneBucket bucket = volume.getBucket(bucketName);
+
+ String uploadID = initiateMultipartUpload(bucket, keyName, STAND_ALONE,
+ ONE);
+
+ uploadPart(bucket, keyName, uploadID, 1, "data".getBytes(UTF_8));
+
+ // passing with an incorrect part number, should throw INVALID_PART error.
+ TreeMap<Integer, String> partsMap = new TreeMap<>();
+ partsMap.put(3, "random");
+
+ OzoneTestUtils.expectOmException(OMException.ResultCodes.INVALID_PART,
+ () -> completeMultipartUpload(bucket, keyName, uploadID, partsMap));
+ }
+
+ @Test
+ public void testCommitPartAfterCompleteUpload() throws Exception {
+ String volumeName = UUID.randomUUID().toString();
+ String bucketName = UUID.randomUUID().toString();
+ String parentDir = "a/b/c/d/";
+ String keyName = parentDir + UUID.randomUUID().toString();
+
+ store.createVolume(volumeName);
+ OzoneVolume volume = store.getVolume(volumeName);
+ volume.createBucket(bucketName);
+ OzoneBucket bucket = volume.getBucket(bucketName);
+
+ OmMultipartInfo omMultipartInfo = bucket.initiateMultipartUpload(keyName,
+ STAND_ALONE, ONE);
+
+ Assert.assertNotNull(omMultipartInfo.getUploadID());
+
+ String uploadID = omMultipartInfo.getUploadID();
+
+ // upload part 1.
+ byte[] data = generateData(5 * 1024 * 1024,
+ (byte) RandomUtils.nextLong());
+ OzoneOutputStream ozoneOutputStream = bucket.createMultipartKey(keyName,
+ data.length, 1, uploadID);
+ ozoneOutputStream.write(data, 0, data.length);
+ ozoneOutputStream.close();
+
+ OmMultipartCommitUploadPartInfo omMultipartCommitUploadPartInfo =
+ ozoneOutputStream.getCommitUploadPartInfo();
+
+ // Do not close output stream for part 2.
+ ozoneOutputStream = bucket.createMultipartKey(keyName,
+ data.length, 2, omMultipartInfo.getUploadID());
+ ozoneOutputStream.write(data, 0, data.length);
+
+ Map<Integer, String> partsMap = new LinkedHashMap<>();
+ partsMap.put(1, omMultipartCommitUploadPartInfo.getPartName());
+ OmMultipartUploadCompleteInfo omMultipartUploadCompleteInfo =
+ bucket.completeMultipartUpload(keyName,
+ uploadID, partsMap);
+ Assert.assertNotNull(omMultipartUploadCompleteInfo);
+
+ Assert.assertNotNull(omMultipartCommitUploadPartInfo);
+
+ byte[] fileContent = new byte[data.length];
+ OzoneInputStream inputStream = bucket.readKey(keyName);
+ inputStream.read(fileContent);
+ StringBuilder sb = new StringBuilder(data.length);
+
+ // Combine all parts data, and check is it matching with get key data.
+ String part1 = new String(data, UTF_8);
+ sb.append(part1);
+ Assert.assertEquals(sb.toString(), new String(fileContent, UTF_8));
+
+ try {
+ ozoneOutputStream.close();
+ Assert.fail("testCommitPartAfterCompleteUpload failed");
+ } catch (IOException ex) {
+ Assert.assertTrue(ex instanceof OMException);
+ Assert.assertEquals(NO_SUCH_MULTIPART_UPLOAD_ERROR,
+ ((OMException) ex).getResult());
+ }
+ }
+
+ private String initiateMultipartUpload(OzoneBucket bucket, String keyName,
+ ReplicationType replicationType, ReplicationFactor replicationFactor)
+ throws Exception {
+ OmMultipartInfo multipartInfo = bucket.initiateMultipartUpload(keyName,
+ replicationType, replicationFactor);
+
+ String uploadID = multipartInfo.getUploadID();
+ Assert.assertNotNull(uploadID);
+
+ return uploadID;
+ }
+
+ private String uploadPart(OzoneBucket bucket, String keyName, String
+ uploadID, int partNumber, byte[] data) throws Exception {
+
+ OzoneOutputStream ozoneOutputStream = bucket.createMultipartKey(keyName,
+ data.length, partNumber, uploadID);
+ ozoneOutputStream.write(data, 0,
+ data.length);
+ ozoneOutputStream.close();
+
+ OmMultipartCommitUploadPartInfo omMultipartCommitUploadPartInfo =
+ ozoneOutputStream.getCommitUploadPartInfo();
+
+ Assert.assertNotNull(omMultipartCommitUploadPartInfo);
+ Assert.assertNotNull(omMultipartCommitUploadPartInfo.getPartName());
+
+ return omMultipartCommitUploadPartInfo.getPartName();
+ }
+
+ private void completeMultipartUpload(OzoneBucket bucket, String keyName,
+ String uploadID, Map<Integer, String> partsMap) throws Exception {
+ OmMultipartUploadCompleteInfo omMultipartUploadCompleteInfo = bucket
+ .completeMultipartUpload(keyName, uploadID, partsMap);
+
+ Assert.assertNotNull(omMultipartUploadCompleteInfo);
+ Assert.assertEquals(omMultipartUploadCompleteInfo.getBucket(), bucket
+ .getName());
+ Assert.assertEquals(omMultipartUploadCompleteInfo.getVolume(), bucket
+ .getVolumeName());
+ Assert.assertEquals(omMultipartUploadCompleteInfo.getKey(), keyName);
+ Assert.assertNotNull(omMultipartUploadCompleteInfo.getHash());
+ }
+
+ private byte[] generateData(int size, byte val) {
+ byte[] chars = new byte[size];
+ Arrays.fill(chars, val);
+ return chars;
+ }
+
}
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/utils/OzoneManagerRatisUtils.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/utils/OzoneManagerRatisUtils.java
index 40a5396..2b39b8d 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/utils/OzoneManagerRatisUtils.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/utils/OzoneManagerRatisUtils.java
@@ -62,6 +62,7 @@ import org.apache.hadoop.ozone.om.request.s3.multipart.S3MultipartUploadAbortReq
import org.apache.hadoop.ozone.om.request.s3.multipart.S3MultipartUploadCommitPartRequest;
import org.apache.hadoop.ozone.om.request.s3.multipart.S3MultipartUploadCommitPartRequestV1;
import org.apache.hadoop.ozone.om.request.s3.multipart.S3MultipartUploadCompleteRequest;
+import org.apache.hadoop.ozone.om.request.s3.multipart.S3MultipartUploadCompleteRequestV1;
import org.apache.hadoop.ozone.om.request.s3.security.S3GetSecretRequest;
import org.apache.hadoop.ozone.om.request.security.OMCancelDelegationTokenRequest;
import org.apache.hadoop.ozone.om.request.security.OMGetDelegationTokenRequest;
@@ -196,6 +197,9 @@ public final class OzoneManagerRatisUtils {
case AbortMultiPartUpload:
return new S3MultipartUploadAbortRequest(omRequest);
case CompleteMultiPartUpload:
+ if (isBucketFSOptimized()) {
+ return new S3MultipartUploadCompleteRequestV1(omRequest);
+ }
return new S3MultipartUploadCompleteRequest(omRequest);
case AddAcl:
case RemoveAcl:
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/file/OMFileRequest.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/file/OMFileRequest.java
index 7f2d2c5..ebf86ce 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/file/OMFileRequest.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/file/OMFileRequest.java
@@ -542,9 +542,10 @@ public final class OMFileRequest {
* @param omMetadataMgr
* @param batchOp
* @param omFileInfo
+ * @return db file key
* @throws IOException
*/
- public static void addToFileTable(OMMetadataManager omMetadataMgr,
+ public static String addToFileTable(OMMetadataManager omMetadataMgr,
BatchOperation batchOp,
OmKeyInfo omFileInfo)
throws IOException {
@@ -554,6 +555,7 @@ public final class OMFileRequest {
omMetadataMgr.getKeyTable().putWithBatch(batchOp,
dbFileKey, omFileInfo);
+ return dbFileKey;
}
/**
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3MultipartUploadCommitPartRequestV1.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3MultipartUploadCommitPartRequestV1.java
index 5546010..43285b4 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3MultipartUploadCommitPartRequestV1.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3MultipartUploadCommitPartRequestV1.java
@@ -86,7 +86,8 @@ public class S3MultipartUploadCommitPartRequestV1
boolean acquiredLock = false;
IOException exception = null;
- String partName = null;
+ String dbPartName;
+ String fullKeyPartName = null;
OMResponse.Builder omResponse = OmResponseUtil.getOMResponseBuilder(
getOmRequest());
OMClientResponse omClientResponse = null;
@@ -95,8 +96,8 @@ public class S3MultipartUploadCommitPartRequestV1
OmKeyInfo omKeyInfo = null;
String multipartKey = null;
OmMultipartKeyInfo multipartKeyInfo = null;
- Result result = null;
- OmBucketInfo omBucketInfo = null;
+ Result result;
+ OmBucketInfo omBucketInfo;
OmBucketInfo copyBucketInfo = null;
try {
keyArgs = resolveBucketLink(ozoneManager, keyArgs, auditMap);
@@ -147,8 +148,18 @@ public class S3MultipartUploadCommitPartRequestV1
// Set the UpdateID to current transactionLogIndex
omKeyInfo.setUpdateID(trxnLogIndex, ozoneManager.isRatisEnabled());
- String ozoneKey = omMetadataManager.getOzonePathKey(parentID, fileName);
- partName = ozoneKey + clientID;
+ /**
+ * Format of PartName stored into MultipartInfoTable is,
+ * "fileName + ClientID".
+ *
+ * Contract is that all part names present in a multipart info will
+ * have same key prefix path.
+ *
+ * For example:
+ * /vol1/buck1/a/b/c/part-1, /vol1/buck1/a/b/c/part-2,
+ * /vol1/buck1/a/b/c/part-n
+ */
+ dbPartName = fileName + clientID;
if (multipartKeyInfo == null) {
// This can occur when user started uploading part by the time commit
@@ -168,9 +179,9 @@ public class S3MultipartUploadCommitPartRequestV1
// Build this multipart upload part info.
OzoneManagerProtocolProtos.PartKeyInfo.Builder partKeyInfo =
OzoneManagerProtocolProtos.PartKeyInfo.newBuilder();
- partKeyInfo.setPartName(partName);
+ partKeyInfo.setPartName(dbPartName);
partKeyInfo.setPartNumber(partNumber);
- partKeyInfo.setPartKeyInfo(omKeyInfo.getProtobuf(
+ partKeyInfo.setPartKeyInfo(omKeyInfo.getProtobuf(fileName,
getOmRequest().getVersion()));
// Add this part information in to multipartKeyInfo.
@@ -207,9 +218,15 @@ public class S3MultipartUploadCommitPartRequestV1
keyArgs.getKeyLocationsList().size() * scmBlockSize * factor;
omBucketInfo.incrUsedBytes(correctedSpace);
+ // Prepare response. Sets user given full key part name in 'partName'
+ // attribute in response object.
+ String fullOzoneKeyName = omMetadataManager.getOzoneKey(
+ volumeName, bucketName, keyName);
+ fullKeyPartName = fullOzoneKeyName + clientID;
omResponse.setCommitMultiPartUploadResponse(
MultipartCommitUploadPartResponse.newBuilder()
- .setPartName(partName));
+ .setPartName(fullKeyPartName));
+
omClientResponse = new S3MultipartUploadCommitPartResponseV1(
omResponse.build(), multipartKey, openKey,
multipartKeyInfo, oldPartKeyInfo, omKeyInfo,
@@ -234,8 +251,8 @@ public class S3MultipartUploadCommitPartRequestV1
}
logResult(ozoneManager, multipartCommitUploadPartRequest, keyArgs,
- auditMap, volumeName, bucketName, keyName, exception, partName,
- result);
+ auditMap, volumeName, bucketName, keyName, exception,
+ fullKeyPartName, result);
return omClientResponse;
}
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3MultipartUploadCompleteRequest.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3MultipartUploadCompleteRequest.java
index 6d64b54..3c2d5a6 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3MultipartUploadCompleteRequest.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3MultipartUploadCompleteRequest.java
@@ -25,6 +25,7 @@ import java.util.List;
import java.util.Map;
import java.util.TreeMap;
+import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.audit.OMAction;
@@ -36,7 +37,9 @@ import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfoGroup;
import org.apache.hadoop.ozone.om.helpers.OmMultipartKeyInfo;
import org.apache.hadoop.ozone.om.helpers.OzoneAclUtil;
+import org.apache.hadoop.ozone.om.helpers.OzoneFSUtils;
import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerDoubleBufferHelper;
+import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerRatisUtils;
import org.apache.hadoop.ozone.om.request.key.OMKeyRequest;
import org.apache.hadoop.ozone.om.request.util.OmResponseUtil;
import org.apache.hadoop.ozone.om.response.OMClientResponse;
@@ -57,6 +60,7 @@ import org.apache.commons.codec.digest.DigestUtils;
import static org.apache.hadoop.ozone.OzoneConsts.OM_MULTIPART_MIN_SIZE;
import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.NOT_A_FILE;
import static org.apache.hadoop.ozone.om.lock.OzoneManagerLock.Resource.BUCKET_LOCK;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -170,119 +174,21 @@ public class S3MultipartUploadCompleteRequest extends OMKeyRequest {
}
// First Check for Invalid Part Order.
- int prevPartNumber = partsList.get(0).getPartNumber();
List< Integer > partNumbers = new ArrayList<>();
- int partsListSize = partsList.size();
- partNumbers.add(prevPartNumber);
- for (int i = 1; i < partsListSize; i++) {
- int currentPartNumber = partsList.get(i).getPartNumber();
- if (prevPartNumber >= currentPartNumber) {
- LOG.error("PartNumber at index {} is {}, and its previous " +
- "partNumber at index {} is {} for ozonekey is " +
- "{}", i, currentPartNumber, i - 1, prevPartNumber,
- ozoneKey);
- throw new OMException(
- failureMessage(requestedVolume, requestedBucket, keyName) +
- " because parts are in Invalid order.",
- OMException.ResultCodes.INVALID_PART_ORDER);
- }
- prevPartNumber = currentPartNumber;
- partNumbers.add(prevPartNumber);
- }
-
+ int partsListSize = getPartsListSize(requestedVolume,
+ requestedBucket, keyName, ozoneKey, partNumbers, partsList);
List<OmKeyLocationInfo> partLocationInfos = new ArrayList<>();
- long dataSize = 0;
- int currentPartCount = 0;
- // Now do actual logic, and check for any Invalid part during this.
- for (OzoneManagerProtocolProtos.Part part : partsList) {
- currentPartCount++;
- int partNumber = part.getPartNumber();
- String partName = part.getPartName();
-
- PartKeyInfo partKeyInfo = partKeyInfoMap.get(partNumber);
-
- if (partKeyInfo == null ||
- !partName.equals(partKeyInfo.getPartName())) {
- String omPartName = partKeyInfo == null ? null :
- partKeyInfo.getPartName();
- throw new OMException(
- failureMessage(requestedVolume, requestedBucket, keyName) +
- ". Provided Part info is { " + partName + ", " + partNumber +
- "}, whereas OM has partName " + omPartName,
- OMException.ResultCodes.INVALID_PART);
- }
-
- OmKeyInfo currentPartKeyInfo = OmKeyInfo
- .getFromProtobuf(partKeyInfo.getPartKeyInfo());
-
- // Except for last part all parts should have minimum size.
- if (currentPartCount != partsListSize) {
- if (currentPartKeyInfo.getDataSize() < OM_MULTIPART_MIN_SIZE) {
- LOG.error("MultipartUpload: {} Part number: {} size {} is less "
- + "than minimum part size {}", ozoneKey,
- partKeyInfo.getPartNumber(),
- currentPartKeyInfo.getDataSize(),
- OzoneConsts.OM_MULTIPART_MIN_SIZE);
- throw new OMException(
- failureMessage(requestedVolume, requestedBucket, keyName) +
- ". Entity too small.",
- OMException.ResultCodes.ENTITY_TOO_SMALL);
- }
- }
-
- // As all part keys will have only one version.
- OmKeyLocationInfoGroup currentKeyInfoGroup = currentPartKeyInfo
- .getKeyLocationVersions().get(0);
- partLocationInfos.addAll(currentKeyInfoGroup.getLocationList());
- dataSize += currentPartKeyInfo.getDataSize();
- }
+ long dataSize = getMultipartDataSize(requestedVolume, requestedBucket,
+ keyName, ozoneKey, partKeyInfoMap, partsListSize,
+ partLocationInfos, partsList, omMetadataManager);
// All parts have same replication information. Here getting from last
// part.
- HddsProtos.ReplicationType type = partKeyInfoMap.lastEntry().getValue()
- .getPartKeyInfo().getType();
- HddsProtos.ReplicationFactor factor =
- partKeyInfoMap.lastEntry().getValue().getPartKeyInfo().getFactor();
-
- OmKeyInfo omKeyInfo = omMetadataManager.getKeyTable().get(ozoneKey);
- if (omKeyInfo == null) {
- // This is a newly added key, it does not have any versions.
- OmKeyLocationInfoGroup keyLocationInfoGroup = new
- OmKeyLocationInfoGroup(0, partLocationInfos);
-
- // Get the objectID of the key from OpenKeyTable
- OmKeyInfo dbOpenKeyInfo = omMetadataManager.getOpenKeyTable()
- .get(multipartKey);
-
- // A newly created key, this is the first version.
- OmKeyInfo.Builder builder =
- new OmKeyInfo.Builder().setVolumeName(volumeName)
- .setBucketName(bucketName).setKeyName(keyName)
- .setReplicationFactor(factor).setReplicationType(type)
- .setCreationTime(keyArgs.getModificationTime())
- .setModificationTime(keyArgs.getModificationTime())
- .setDataSize(dataSize)
- .setOmKeyLocationInfos(
- Collections.singletonList(keyLocationInfoGroup))
- .setAcls(OzoneAclUtil.fromProtobuf(keyArgs.getAclsList()));
- // Check if db entry has ObjectID. This check is required because
- // it is possible that between multipart key uploads and complete,
- // we had an upgrade.
- if (dbOpenKeyInfo.getObjectID() != 0) {
- builder.setObjectID(dbOpenKeyInfo.getObjectID());
- }
- omKeyInfo = builder.build();
- } else {
- // Already a version exists, so we should add it as a new version.
- // But now as versioning is not supported, just following the commit
- // key approach. When versioning support comes, then we can uncomment
- // below code keyInfo.addNewVersion(locations);
- omKeyInfo.updateLocationInfoList(partLocationInfos);
- omKeyInfo.setModificationTime(keyArgs.getModificationTime());
- omKeyInfo.setDataSize(dataSize);
- }
- omKeyInfo.setUpdateID(trxnLogIndex, ozoneManager.isRatisEnabled());
+ OmKeyInfo omKeyInfo = getOmKeyInfo(ozoneManager, trxnLogIndex, keyArgs,
+ volumeName, bucketName, keyName, multipartKey,
+ omMetadataManager, ozoneKey, partKeyInfoMap, partLocationInfos,
+ dataSize);
//Find all unused parts.
List< OmKeyInfo > unUsedParts = new ArrayList<>();
@@ -329,6 +235,19 @@ public class S3MultipartUploadCompleteRequest extends OMKeyRequest {
}
}
+ logResult(ozoneManager, multipartUploadCompleteRequest, partsList,
+ auditMap, volumeName, bucketName, keyName, exception, result);
+
+ return omClientResponse;
+ }
+
+ @SuppressWarnings("checkstyle:ParameterNumber")
+ protected void logResult(OzoneManager ozoneManager,
+ MultipartUploadCompleteRequest multipartUploadCompleteRequest,
+ List<OzoneManagerProtocolProtos.Part> partsList,
+ Map<String, String> auditMap, String volumeName,
+ String bucketName, String keyName, IOException exception,
+ Result result) {
auditMap.put(OzoneConsts.MULTIPART_LIST, partsList.toString());
// audit log
@@ -351,8 +270,177 @@ public class S3MultipartUploadCompleteRequest extends OMKeyRequest {
LOG.error("Unrecognized Result for S3MultipartUploadCommitRequest: {}",
multipartUploadCompleteRequest);
}
+ }
- return omClientResponse;
+ @SuppressWarnings("checkstyle:ParameterNumber")
+ protected OmKeyInfo getOmKeyInfo(OzoneManager ozoneManager, long trxnLogIndex,
+ KeyArgs keyArgs, String volumeName, String bucketName, String keyName,
+ String multipartKey, OMMetadataManager omMetadataManager,
+ String ozoneKey, TreeMap<Integer, PartKeyInfo> partKeyInfoMap,
+ List<OmKeyLocationInfo> partLocationInfos, long dataSize)
+ throws IOException {
+ HddsProtos.ReplicationType type = partKeyInfoMap.lastEntry().getValue()
+ .getPartKeyInfo().getType();
+ HddsProtos.ReplicationFactor factor =
+ partKeyInfoMap.lastEntry().getValue().getPartKeyInfo().getFactor();
+
+ OmKeyInfo omKeyInfo = getOmKeyInfoFromKeyTable(ozoneKey, keyName,
+ omMetadataManager);
+ if (omKeyInfo == null) {
+ // This is a newly added key, it does not have any versions.
+ OmKeyLocationInfoGroup keyLocationInfoGroup = new
+ OmKeyLocationInfoGroup(0, partLocationInfos);
+
+ // Get the objectID of the key from OpenKeyTable
+ OmKeyInfo dbOpenKeyInfo = getOmKeyInfoFromOpenKeyTable(multipartKey,
+ keyName, omMetadataManager);
+
+ // A newly created key, this is the first version.
+ OmKeyInfo.Builder builder =
+ new OmKeyInfo.Builder().setVolumeName(volumeName)
+ .setBucketName(bucketName).setKeyName(dbOpenKeyInfo.getKeyName())
+ .setReplicationFactor(factor).setReplicationType(type)
+ .setCreationTime(keyArgs.getModificationTime())
+ .setModificationTime(keyArgs.getModificationTime())
+ .setDataSize(dataSize)
+ .setOmKeyLocationInfos(
+ Collections.singletonList(keyLocationInfoGroup))
+ .setAcls(OzoneAclUtil.fromProtobuf(keyArgs.getAclsList()));
+ // Check if db entry has ObjectID. This check is required because
+ // it is possible that between multipart key uploads and complete,
+ // we had an upgrade.
+ if (dbOpenKeyInfo.getObjectID() != 0) {
+ builder.setObjectID(dbOpenKeyInfo.getObjectID());
+ }
+ updatePrefixFSOInfo(dbOpenKeyInfo, builder);
+ omKeyInfo = builder.build();
+ } else {
+ // Already a version exists, so we should add it as a new version.
+ // But now as versioning is not supported, just following the commit
+ // key approach. When versioning support comes, then we can uncomment
+ // below code keyInfo.addNewVersion(locations);
+ omKeyInfo.updateLocationInfoList(partLocationInfos);
+ omKeyInfo.setModificationTime(keyArgs.getModificationTime());
+ omKeyInfo.setDataSize(dataSize);
+ }
+ omKeyInfo.setUpdateID(trxnLogIndex, ozoneManager.isRatisEnabled());
+ return omKeyInfo;
+ }
+
+ protected void updatePrefixFSOInfo(OmKeyInfo dbOpenKeyInfo,
+ OmKeyInfo.Builder builder) {
+ // FSOBucket is disabled. Do nothing.
+ }
+
+ protected OmKeyInfo getOmKeyInfoFromKeyTable(String dbOzoneKey,
+ String keyName, OMMetadataManager omMetadataManager) throws IOException {
+ return omMetadataManager.getKeyTable().get(dbOzoneKey);
+ }
+
+ protected OmKeyInfo getOmKeyInfoFromOpenKeyTable(String dbMultipartKey,
+ String keyName, OMMetadataManager omMetadataManager) throws IOException {
+ return omMetadataManager.getOpenKeyTable().get(dbMultipartKey);
+ }
+
+ protected int getPartsListSize(String requestedVolume,
+ String requestedBucket, String keyName, String ozoneKey,
+ List<Integer> partNumbers,
+ List<OzoneManagerProtocolProtos.Part> partsList) throws OMException {
+ int prevPartNumber = partsList.get(0).getPartNumber();
+ int partsListSize = partsList.size();
+ partNumbers.add(prevPartNumber);
+ for (int i = 1; i < partsListSize; i++) {
+ int currentPartNumber = partsList.get(i).getPartNumber();
+ if (prevPartNumber >= currentPartNumber) {
+ LOG.error("PartNumber at index {} is {}, and its previous " +
+ "partNumber at index {} is {} for ozonekey is " +
+ "{}", i, currentPartNumber, i - 1, prevPartNumber,
+ ozoneKey);
+ throw new OMException(
+ failureMessage(requestedVolume, requestedBucket, keyName) +
+ " because parts are in Invalid order.",
+ OMException.ResultCodes.INVALID_PART_ORDER);
+ }
+ prevPartNumber = currentPartNumber;
+ partNumbers.add(prevPartNumber);
+ }
+ return partsListSize;
+ }
+
+ @SuppressWarnings("checkstyle:ParameterNumber")
+ protected long getMultipartDataSize(String requestedVolume,
+ String requestedBucket, String keyName, String ozoneKey,
+ TreeMap<Integer, PartKeyInfo> partKeyInfoMap,
+ int partsListSize, List<OmKeyLocationInfo> partLocationInfos,
+ List<OzoneManagerProtocolProtos.Part> partsList,
+ OMMetadataManager omMetadataManager) throws OMException {
+ long dataSize = 0;
+ int currentPartCount = 0;
+ // Now do actual logic, and check for any Invalid part during this.
+ for (OzoneManagerProtocolProtos.Part part : partsList) {
+ currentPartCount++;
+ int partNumber = part.getPartNumber();
+ String partName = part.getPartName();
+
+ PartKeyInfo partKeyInfo = partKeyInfoMap.get(partNumber);
+
+ String dbPartName = null;
+ if (partKeyInfo != null) {
+ dbPartName = preparePartName(requestedVolume, requestedBucket, keyName,
+ partKeyInfo, omMetadataManager);
+ }
+ if (!StringUtils.equals(partName, dbPartName)) {
+ String omPartName = partKeyInfo == null ? null : dbPartName;
+ throw new OMException(
+ failureMessage(requestedVolume, requestedBucket, keyName) +
+ ". Provided Part info is { " + partName + ", " + partNumber +
+ "}, whereas OM has partName " + omPartName,
+ OMException.ResultCodes.INVALID_PART);
+ }
+
+ OmKeyInfo currentPartKeyInfo = OmKeyInfo
+ .getFromProtobuf(partKeyInfo.getPartKeyInfo());
+
+ // Except for last part all parts should have minimum size.
+ if (currentPartCount != partsListSize) {
+ if (currentPartKeyInfo.getDataSize() < OM_MULTIPART_MIN_SIZE) {
+ LOG.error("MultipartUpload: {} Part number: {} size {} is less "
+ + "than minimum part size {}", ozoneKey,
+ partKeyInfo.getPartNumber(),
+ currentPartKeyInfo.getDataSize(),
+ OzoneConsts.OM_MULTIPART_MIN_SIZE);
+ throw new OMException(
+ failureMessage(requestedVolume, requestedBucket, keyName) +
+ ". Entity too small.",
+ OMException.ResultCodes.ENTITY_TOO_SMALL);
+ }
+ }
+
+ // As all part keys will have only one version.
+ OmKeyLocationInfoGroup currentKeyInfoGroup = currentPartKeyInfo
+ .getKeyLocationVersions().get(0);
+ partLocationInfos.addAll(currentKeyInfoGroup.getLocationList());
+ dataSize += currentPartKeyInfo.getDataSize();
+ }
+ return dataSize;
+ }
+
+ private String preparePartName(String requestedVolume,
+ String requestedBucket, String keyName, PartKeyInfo partKeyInfo,
+ OMMetadataManager omMetadataManager) {
+
+ String partName;
+ if (OzoneManagerRatisUtils.isBucketFSOptimized()) {
+ String parentPath = OzoneFSUtils.getParent(keyName);
+ StringBuffer keyPath = new StringBuffer(parentPath);
+ keyPath.append(partKeyInfo.getPartName());
+
+ partName = omMetadataManager.getOzoneKey(requestedVolume,
+ requestedBucket, keyPath.toString());
+ } else {
+ partName = partKeyInfo.getPartName();
+ }
+ return partName;
}
private static String failureMessage(String volume, String bucket,
@@ -361,7 +449,7 @@ public class S3MultipartUploadCompleteRequest extends OMKeyRequest {
volume + " bucket: " + bucket + " key: " + keyName;
}
- private void updateCache(OMMetadataManager omMetadataManager,
+ protected void updateCache(OMMetadataManager omMetadataManager,
String ozoneKey, String multipartKey, OmKeyInfo omKeyInfo,
long transactionLogIndex) {
// Update cache.
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3MultipartUploadCompleteRequestV1.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3MultipartUploadCompleteRequestV1.java
new file mode 100644
index 0000000..5b84e53
--- /dev/null
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3MultipartUploadCompleteRequestV1.java
@@ -0,0 +1,268 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.om.request.s3.multipart;
+
+import com.google.common.base.Optional;
+import org.apache.commons.codec.digest.DigestUtils;
+import org.apache.hadoop.hdds.utils.db.cache.CacheKey;
+import org.apache.hadoop.hdds.utils.db.cache.CacheValue;
+import org.apache.hadoop.ozone.om.OMMetadataManager;
+import org.apache.hadoop.ozone.om.OzoneManager;
+import org.apache.hadoop.ozone.om.exceptions.OMException;
+import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
+import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
+import org.apache.hadoop.ozone.om.helpers.OmMultipartKeyInfo;
+import org.apache.hadoop.ozone.om.helpers.OzoneFSUtils;
+import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerDoubleBufferHelper;
+import org.apache.hadoop.ozone.om.request.file.OMFileRequest;
+import org.apache.hadoop.ozone.om.request.util.OmResponseUtil;
+import org.apache.hadoop.ozone.om.response.OMClientResponse;
+import org.apache.hadoop.ozone.om.response.s3.multipart.S3MultipartUploadCompleteResponseV1;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.KeyArgs;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.MultipartUploadCompleteRequest;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.MultipartUploadCompleteResponse;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.PartKeyInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.NOT_A_FILE;
+import static org.apache.hadoop.ozone.om.lock.OzoneManagerLock.Resource.BUCKET_LOCK;
+import static org.apache.hadoop.ozone.om.request.file.OMFileRequest.OMDirectoryResult.DIRECTORY_EXISTS;
+
+/**
+ * Handle Multipart upload complete request.
+ */
+public class S3MultipartUploadCompleteRequestV1
+ extends S3MultipartUploadCompleteRequest {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(S3MultipartUploadCompleteRequestV1.class);
+
+ public S3MultipartUploadCompleteRequestV1(OMRequest omRequest) {
+ super(omRequest);
+ }
+
+ @Override
+ @SuppressWarnings("methodlength")
+ public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager,
+ long trxnLogIndex, OzoneManagerDoubleBufferHelper omDoubleBufferHelper) {
+ MultipartUploadCompleteRequest multipartUploadCompleteRequest =
+ getOmRequest().getCompleteMultiPartUploadRequest();
+
+ KeyArgs keyArgs = multipartUploadCompleteRequest.getKeyArgs();
+
+ List<OzoneManagerProtocolProtos.Part> partsList =
+ multipartUploadCompleteRequest.getPartsListList();
+ Map<String, String> auditMap = buildKeyArgsAuditMap(keyArgs);
+
+ String volumeName = keyArgs.getVolumeName();
+ String bucketName = keyArgs.getBucketName();
+ final String requestedVolume = volumeName;
+ final String requestedBucket = bucketName;
+ String keyName = keyArgs.getKeyName();
+ String uploadID = keyArgs.getMultipartUploadID();
+ String dbMultipartKey;
+
+ ozoneManager.getMetrics().incNumCompleteMultipartUploads();
+
+ OMMetadataManager omMetadataManager = ozoneManager.getMetadataManager();
+
+ boolean acquiredLock = false;
+ OMResponse.Builder omResponse = OmResponseUtil.getOMResponseBuilder(
+ getOmRequest());
+ OMClientResponse omClientResponse = null;
+ IOException exception = null;
+ Result result;
+ try {
+ keyArgs = resolveBucketLink(ozoneManager, keyArgs, auditMap);
+ volumeName = keyArgs.getVolumeName();
+ bucketName = keyArgs.getBucketName();
+
+ acquiredLock = omMetadataManager.getLock().acquireWriteLock(BUCKET_LOCK,
+ volumeName, bucketName);
+
+ validateBucketAndVolume(omMetadataManager, volumeName, bucketName);
+
+ String fileName = OzoneFSUtils.getFileName(keyName);
+ Path keyPath = Paths.get(keyName);
+ OMFileRequest.OMPathInfoV1 pathInfoV1 =
+ OMFileRequest.verifyDirectoryKeysInPath(omMetadataManager,
+ volumeName, bucketName, keyName, keyPath);
+ long parentID = pathInfoV1.getLastKnownParentId();
+
+ dbMultipartKey = omMetadataManager.getMultipartKey(parentID,
+ fileName, uploadID);
+
+ String dbOzoneKey = omMetadataManager.getOzonePathKey(parentID, fileName);
+
+ String ozoneKey = omMetadataManager.getOzoneKey(
+ volumeName, bucketName, keyName);
+
+ OmMultipartKeyInfo multipartKeyInfo =
+ omMetadataManager.getMultipartInfoTable().get(dbMultipartKey);
+
+ // Check for directory exists with same name, if it exists throw error.
+ if (pathInfoV1.getDirectoryResult() == DIRECTORY_EXISTS) {
+ throw new OMException("Can not Complete MPU for file: " + keyName +
+ " as there is already directory in the given path",
+ NOT_A_FILE);
+ }
+
+ if (multipartKeyInfo == null) {
+ throw new OMException(
+ failureMessage(requestedVolume, requestedBucket, keyName),
+ OMException.ResultCodes.NO_SUCH_MULTIPART_UPLOAD_ERROR);
+ }
+ TreeMap<Integer, PartKeyInfo> partKeyInfoMap =
+ multipartKeyInfo.getPartKeyInfoMap();
+
+ if (partsList.size() > 0) {
+ if (partKeyInfoMap.size() == 0) {
+ LOG.error("Complete MultipartUpload failed for key {} , MPU Key has" +
+ " no parts in OM, parts given to upload are {}", ozoneKey,
+ partsList);
+ throw new OMException(
+ failureMessage(requestedVolume, requestedBucket, keyName),
+ OMException.ResultCodes.INVALID_PART);
+ }
+
+ // First Check for Invalid Part Order.
+ List< Integer > partNumbers = new ArrayList<>();
+ int partsListSize = getPartsListSize(requestedVolume,
+ requestedBucket, keyName, ozoneKey, partNumbers, partsList);
+
+ List<OmKeyLocationInfo> partLocationInfos = new ArrayList<>();
+ long dataSize = getMultipartDataSize(requestedVolume, requestedBucket,
+ keyName, ozoneKey, partKeyInfoMap, partsListSize,
+ partLocationInfos, partsList, omMetadataManager);
+
+ // All parts have same replication information. Here getting from last
+ // part.
+ OmKeyInfo omKeyInfo = getOmKeyInfo(ozoneManager, trxnLogIndex, keyArgs,
+ volumeName, bucketName, keyName, dbMultipartKey,
+ omMetadataManager, dbOzoneKey, partKeyInfoMap,
+ partLocationInfos, dataSize);
+
+ //Find all unused parts.
+ List< OmKeyInfo > unUsedParts = new ArrayList<>();
+ for (Map.Entry< Integer, PartKeyInfo > partKeyInfo :
+ partKeyInfoMap.entrySet()) {
+ if (!partNumbers.contains(partKeyInfo.getKey())) {
+ unUsedParts.add(OmKeyInfo
+ .getFromProtobuf(partKeyInfo.getValue().getPartKeyInfo()));
+ }
+ }
+
+ updateCache(omMetadataManager, dbOzoneKey, dbMultipartKey, omKeyInfo,
+ trxnLogIndex);
+
+ omResponse.setCompleteMultiPartUploadResponse(
+ MultipartUploadCompleteResponse.newBuilder()
+ .setVolume(requestedVolume)
+ .setBucket(requestedBucket)
+ .setKey(keyName)
+ .setHash(DigestUtils.sha256Hex(keyName)));
+
+ omClientResponse = new S3MultipartUploadCompleteResponseV1(
+ omResponse.build(), dbMultipartKey, omKeyInfo, unUsedParts);
+
+ result = Result.SUCCESS;
+ } else {
+ throw new OMException(
+ failureMessage(requestedVolume, requestedBucket, keyName) +
+ " because of empty part list",
+ OMException.ResultCodes.INVALID_REQUEST);
+ }
+
+ } catch (IOException ex) {
+ result = Result.FAILURE;
+ exception = ex;
+ omClientResponse = new S3MultipartUploadCompleteResponseV1(
+ createErrorOMResponse(omResponse, exception));
+ } finally {
+ addResponseToDoubleBuffer(trxnLogIndex, omClientResponse,
+ omDoubleBufferHelper);
+ if (acquiredLock) {
+ omMetadataManager.getLock().releaseWriteLock(BUCKET_LOCK,
+ volumeName, bucketName);
+ }
+ }
+
+ logResult(ozoneManager, multipartUploadCompleteRequest, partsList,
+ auditMap, volumeName, bucketName, keyName, exception, result);
+
+ return omClientResponse;
+ }
+
+ protected OmKeyInfo getOmKeyInfoFromKeyTable(String dbOzoneFileKey,
+ String keyName, OMMetadataManager omMetadataManager) throws IOException {
+ return OMFileRequest.getOmKeyInfoFromFileTable(true,
+ omMetadataManager, dbOzoneFileKey, keyName);
+ }
+
+ @Override
+ protected OmKeyInfo getOmKeyInfoFromOpenKeyTable(String dbMultipartKey,
+ String keyName, OMMetadataManager omMetadataManager) throws IOException {
+ return OMFileRequest.getOmKeyInfoFromFileTable(true,
+ omMetadataManager, dbMultipartKey, keyName);
+ }
+
+ @Override
+ protected void updateCache(OMMetadataManager omMetadataManager,
+ String ozoneKey, String multipartKey, OmKeyInfo omKeyInfo,
+ long transactionLogIndex) {
+ // Update cache.
+ // 1. Add key entry to key table.
+ // 2. Delete multipartKey entry from openKeyTable and multipartInfo table.
+ OMFileRequest.addFileTableCacheEntry(omMetadataManager, ozoneKey,
+ omKeyInfo, omKeyInfo.getFileName(), transactionLogIndex);
+
+ omMetadataManager.getOpenKeyTable().addCacheEntry(
+ new CacheKey<>(multipartKey),
+ new CacheValue<>(Optional.absent(), transactionLogIndex));
+ omMetadataManager.getMultipartInfoTable().addCacheEntry(
+ new CacheKey<>(multipartKey),
+ new CacheValue<>(Optional.absent(), transactionLogIndex));
+ }
+
+ protected void updatePrefixFSOInfo(OmKeyInfo dbOpenKeyInfo,
+ OmKeyInfo.Builder builder) {
+ // updates parentID and fileName
+ builder.setParentObjectID(dbOpenKeyInfo.getParentObjectID());
+ builder.setFileName(dbOpenKeyInfo.getFileName());
+ }
+
+ private static String failureMessage(String volume, String bucket,
+ String keyName) {
+ return "Complete Multipart Upload Failed: volume: " +
+ volume + " bucket: " + bucket + " key: " + keyName;
+ }
+}
+
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/s3/multipart/S3MultipartUploadCompleteResponse.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/s3/multipart/S3MultipartUploadCompleteResponse.java
index 20e398e..f593885 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/s3/multipart/S3MultipartUploadCompleteResponse.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/s3/multipart/S3MultipartUploadCompleteResponse.java
@@ -96,4 +96,15 @@ public class S3MultipartUploadCompleteResponse extends OMClientResponse {
}
}
-}
\ No newline at end of file
+ protected String getMultipartKey() {
+ return multipartKey;
+ }
+
+ protected OmKeyInfo getOmKeyInfo() {
+ return omKeyInfo;
+ }
+
+ protected List<OmKeyInfo> getPartsUnusedList() {
+ return partsUnusedList;
+ }
+}
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/s3/multipart/S3MultipartUploadCompleteResponse.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/s3/multipart/S3MultipartUploadCompleteResponseV1.java
similarity index 66%
copy from hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/s3/multipart/S3MultipartUploadCompleteResponse.java
copy to hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/s3/multipart/S3MultipartUploadCompleteResponseV1.java
index 20e398e..bb31dce 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/s3/multipart/S3MultipartUploadCompleteResponse.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/s3/multipart/S3MultipartUploadCompleteResponseV1.java
@@ -18,82 +18,74 @@
package org.apache.hadoop.ozone.om.response.s3.multipart;
-import java.io.IOException;
-import java.util.List;
-
+import org.apache.hadoop.hdds.utils.db.BatchOperation;
import org.apache.hadoop.ozone.om.OMMetadataManager;
import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
import org.apache.hadoop.ozone.om.helpers.RepeatedOmKeyInfo;
+import org.apache.hadoop.ozone.om.request.file.OMFileRequest;
import org.apache.hadoop.ozone.om.response.CleanupTableInfo;
-import org.apache.hadoop.ozone.om.response.OMClientResponse;
-import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
- .OMResponse;
-import org.apache.hadoop.hdds.utils.db.BatchOperation;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse;
import javax.annotation.Nonnull;
+import java.io.IOException;
+import java.util.List;
import static org.apache.hadoop.ozone.om.OmMetadataManagerImpl.DELETED_TABLE;
-import static org.apache.hadoop.ozone.om.OmMetadataManagerImpl.KEY_TABLE;
-import static org.apache.hadoop.ozone.om.OmMetadataManagerImpl.MULTIPARTINFO_TABLE;
-import static org.apache.hadoop.ozone.om.OmMetadataManagerImpl.OPEN_KEY_TABLE;
+import static org.apache.hadoop.ozone.om.OmMetadataManagerImpl.FILE_TABLE;
+import static org.apache.hadoop.ozone.om.OmMetadataManagerImpl.MULTIPARTFILEINFO_TABLE;
+import static org.apache.hadoop.ozone.om.OmMetadataManagerImpl.OPEN_FILE_TABLE;
/**
* Response for Multipart Upload Complete request.
*/
-@CleanupTableInfo(cleanupTables = {OPEN_KEY_TABLE, KEY_TABLE, DELETED_TABLE,
- MULTIPARTINFO_TABLE})
-public class S3MultipartUploadCompleteResponse extends OMClientResponse {
- private String multipartKey;
- private OmKeyInfo omKeyInfo;
- private List<OmKeyInfo> partsUnusedList;
+@CleanupTableInfo(cleanupTables = {OPEN_FILE_TABLE, FILE_TABLE, DELETED_TABLE,
+ MULTIPARTFILEINFO_TABLE})
+public class S3MultipartUploadCompleteResponseV1
+ extends S3MultipartUploadCompleteResponse {
- public S3MultipartUploadCompleteResponse(
+ public S3MultipartUploadCompleteResponseV1(
@Nonnull OMResponse omResponse,
@Nonnull String multipartKey,
@Nonnull OmKeyInfo omKeyInfo,
@Nonnull List<OmKeyInfo> unUsedParts) {
- super(omResponse);
- this.partsUnusedList = unUsedParts;
- this.multipartKey = multipartKey;
- this.omKeyInfo = omKeyInfo;
+ super(omResponse, multipartKey, omKeyInfo, unUsedParts);
}
/**
* For when the request is not successful.
* For a successful request, the other constructor should be used.
*/
- public S3MultipartUploadCompleteResponse(@Nonnull OMResponse omResponse) {
+ public S3MultipartUploadCompleteResponseV1(@Nonnull OMResponse omResponse) {
super(omResponse);
checkStatusNotOK();
}
+
@Override
public void addToDBBatch(OMMetadataManager omMetadataManager,
BatchOperation batchOperation) throws IOException {
omMetadataManager.getOpenKeyTable().deleteWithBatch(batchOperation,
- multipartKey);
+ getMultipartKey());
omMetadataManager.getMultipartInfoTable().deleteWithBatch(batchOperation,
- multipartKey);
+ getMultipartKey());
- String ozoneKey = omMetadataManager.getOzoneKey(omKeyInfo.getVolumeName(),
- omKeyInfo.getBucketName(), omKeyInfo.getKeyName());
- omMetadataManager.getKeyTable().putWithBatch(batchOperation, ozoneKey,
- omKeyInfo);
+ String dbFileKey = OMFileRequest.addToFileTable(omMetadataManager,
+ batchOperation, getOmKeyInfo());
- if (!partsUnusedList.isEmpty()) {
+ if (!getPartsUnusedList().isEmpty()) {
// Add unused parts to deleted key table.
RepeatedOmKeyInfo repeatedOmKeyInfo = omMetadataManager.getDeletedTable()
- .get(ozoneKey);
+ .get(dbFileKey);
if (repeatedOmKeyInfo == null) {
- repeatedOmKeyInfo = new RepeatedOmKeyInfo(partsUnusedList);
+ repeatedOmKeyInfo = new RepeatedOmKeyInfo(getPartsUnusedList());
} else {
- repeatedOmKeyInfo.addOmKeyInfo(omKeyInfo);
+ repeatedOmKeyInfo.addOmKeyInfo(getOmKeyInfo());
}
omMetadataManager.getDeletedTable().putWithBatch(batchOperation,
- ozoneKey, repeatedOmKeyInfo);
+ dbFileKey, repeatedOmKeyInfo);
}
}
+}
-}
\ No newline at end of file
diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/s3/multipart/TestS3MultipartRequest.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/s3/multipart/TestS3MultipartRequest.java
index 9f6cff8..16cb4ae 100644
--- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/s3/multipart/TestS3MultipartRequest.java
+++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/s3/multipart/TestS3MultipartRequest.java
@@ -115,7 +115,7 @@ public class TestS3MultipartRequest {
keyName);
S3InitiateMultipartUploadRequest s3InitiateMultipartUploadRequest =
- new S3InitiateMultipartUploadRequest(omRequest);
+ getS3InitiateMultipartUploadReq(omRequest);
OMRequest modifiedRequest =
s3InitiateMultipartUploadRequest.preExecute(ozoneManager);
@@ -204,7 +204,7 @@ public class TestS3MultipartRequest {
keyName, multipartUploadID, partList);
S3MultipartUploadCompleteRequest s3MultipartUploadCompleteRequest =
- new S3MultipartUploadCompleteRequest(omRequest);
+ getS3MultipartUploadCompleteReq(omRequest);
OMRequest modifiedRequest =
s3MultipartUploadCompleteRequest.preExecute(ozoneManager);
@@ -247,6 +247,11 @@ public class TestS3MultipartRequest {
return modifiedRequest;
}
+ protected S3MultipartUploadCompleteRequest getS3MultipartUploadCompleteReq(
+ OMRequest omRequest) {
+ return new S3MultipartUploadCompleteRequest(omRequest);
+ }
+
protected S3MultipartUploadCommitPartRequest getS3MultipartUploadCommitReq(
OMRequest omRequest) {
return new S3MultipartUploadCommitPartRequest(omRequest);
diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/s3/multipart/TestS3MultipartUploadCompleteRequest.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/s3/multipart/TestS3MultipartUploadCompleteRequest.java
index a04f51f..3d399b1 100644
--- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/s3/multipart/TestS3MultipartUploadCompleteRequest.java
+++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/s3/multipart/TestS3MultipartUploadCompleteRequest.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.ozone.om.request.s3.multipart;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
@@ -55,7 +56,7 @@ public class TestS3MultipartUploadCompleteRequest
public void testValidateAndUpdateCacheSuccess() throws Exception {
String volumeName = UUID.randomUUID().toString();
String bucketName = UUID.randomUUID().toString();
- String keyName = UUID.randomUUID().toString();
+ String keyName = getKeyName();
TestOMRequestUtils.addVolumeAndBucketToDB(volumeName, bucketName,
omMetadataManager);
@@ -64,7 +65,7 @@ public class TestS3MultipartUploadCompleteRequest
bucketName, keyName);
S3InitiateMultipartUploadRequest s3InitiateMultipartUploadRequest =
- new S3InitiateMultipartUploadRequest(initiateMPURequest);
+ getS3InitiateMultipartUploadReq(initiateMPURequest);
OMClientResponse omClientResponse =
s3InitiateMultipartUploadRequest.validateAndUpdateCache(ozoneManager,
@@ -78,27 +79,25 @@ public class TestS3MultipartUploadCompleteRequest
bucketName, keyName, clientID, multipartUploadID, 1);
S3MultipartUploadCommitPartRequest s3MultipartUploadCommitPartRequest =
- new S3MultipartUploadCommitPartRequest(commitMultipartRequest);
+ getS3MultipartUploadCommitReq(commitMultipartRequest);
// Add key to open key table.
- TestOMRequestUtils.addKeyToTable(true, volumeName, bucketName,
- keyName, clientID, HddsProtos.ReplicationType.RATIS,
- HddsProtos.ReplicationFactor.ONE, omMetadataManager);
+ addKeyToTable(volumeName, bucketName, keyName, clientID);
s3MultipartUploadCommitPartRequest.validateAndUpdateCache(ozoneManager,
2L, ozoneManagerDoubleBufferHelper);
List<Part> partList = new ArrayList<>();
- partList.add(Part.newBuilder().setPartName(
- omMetadataManager.getOzoneKey(volumeName, bucketName, keyName) +
- clientID).setPartNumber(1).build());
+ String partName = getPartName(volumeName, bucketName, keyName, clientID);
+ partList.add(Part.newBuilder().setPartName(partName).setPartNumber(1)
+ .build());
OMRequest completeMultipartRequest = doPreExecuteCompleteMPU(volumeName,
bucketName, keyName, multipartUploadID, partList);
S3MultipartUploadCompleteRequest s3MultipartUploadCompleteRequest =
- new S3MultipartUploadCompleteRequest(completeMultipartRequest);
+ getS3MultipartUploadCompleteReq(completeMultipartRequest);
omClientResponse =
s3MultipartUploadCompleteRequest.validateAndUpdateCache(ozoneManager,
@@ -107,14 +106,71 @@ public class TestS3MultipartUploadCompleteRequest
Assert.assertEquals(OzoneManagerProtocolProtos.Status.OK,
omClientResponse.getOMResponse().getStatus());
- String multipartKey = omMetadataManager.getMultipartKey(volumeName,
- bucketName, keyName, multipartUploadID);
+ String multipartKey = getMultipartKey(volumeName, bucketName, keyName,
+ multipartUploadID);
Assert.assertNull(omMetadataManager.getOpenKeyTable().get(multipartKey));
Assert.assertNull(
omMetadataManager.getMultipartInfoTable().get(multipartKey));
Assert.assertNotNull(omMetadataManager.getKeyTable().get(
- omMetadataManager.getOzoneKey(volumeName, bucketName, keyName)));
+ getOzoneDBKey(volumeName, bucketName, keyName)));
+ }
+
+ @Test
+ public void testInvalidPartOrderError() throws Exception {
+ String volumeName = UUID.randomUUID().toString();
+ String bucketName = UUID.randomUUID().toString();
+ String keyName = getKeyName();
+
+ TestOMRequestUtils.addVolumeAndBucketToDB(volumeName, bucketName,
+ omMetadataManager);
+
+ OMRequest initiateMPURequest = doPreExecuteInitiateMPU(volumeName,
+ bucketName, keyName);
+
+ S3InitiateMultipartUploadRequest s3InitiateMultipartUploadRequest =
+ getS3InitiateMultipartUploadReq(initiateMPURequest);
+
+ OMClientResponse omClientResponse =
+ s3InitiateMultipartUploadRequest.validateAndUpdateCache(
+ ozoneManager, 1L, ozoneManagerDoubleBufferHelper);
+
+ long clientID = Time.now();
+ String multipartUploadID = omClientResponse.getOMResponse()
+ .getInitiateMultiPartUploadResponse().getMultipartUploadID();
+
+ OMRequest commitMultipartRequest = doPreExecuteCommitMPU(volumeName,
+ bucketName, keyName, clientID, multipartUploadID, 1);
+
+ S3MultipartUploadCommitPartRequest s3MultipartUploadCommitPartRequest =
+ getS3MultipartUploadCommitReq(commitMultipartRequest);
+
+ // Add key to open key table.
+ addKeyToTable(volumeName, bucketName, keyName, clientID);
+
+ s3MultipartUploadCommitPartRequest.validateAndUpdateCache(ozoneManager,
+ 2L, ozoneManagerDoubleBufferHelper);
+
+ List<Part> partList = new ArrayList<>();
+
+ String partName = getPartName(volumeName, bucketName, keyName, clientID);
+ partList.add(Part.newBuilder().setPartName(partName).setPartNumber(23)
+ .build());
+ partList.add(Part.newBuilder().setPartName(partName).setPartNumber(1)
+ .build());
+
+ OMRequest completeMultipartRequest = doPreExecuteCompleteMPU(volumeName,
+ bucketName, keyName, multipartUploadID, partList);
+
+ S3MultipartUploadCompleteRequest s3MultipartUploadCompleteRequest =
+ getS3MultipartUploadCompleteReq(completeMultipartRequest);
+
+ omClientResponse =
+ s3MultipartUploadCompleteRequest.validateAndUpdateCache(
+ ozoneManager, 3L, ozoneManagerDoubleBufferHelper);
+
+ Assert.assertEquals(OzoneManagerProtocolProtos.Status.INVALID_PART_ORDER,
+ omClientResponse.getOMResponse().getStatus());
}
@Test
@@ -129,7 +185,7 @@ public class TestS3MultipartUploadCompleteRequest
bucketName, keyName, UUID.randomUUID().toString(), partList);
S3MultipartUploadCompleteRequest s3MultipartUploadCompleteRequest =
- new S3MultipartUploadCompleteRequest(completeMultipartRequest);
+ getS3MultipartUploadCompleteReq(completeMultipartRequest);
OMClientResponse omClientResponse =
s3MultipartUploadCompleteRequest.validateAndUpdateCache(ozoneManager,
@@ -153,7 +209,7 @@ public class TestS3MultipartUploadCompleteRequest
bucketName, keyName, UUID.randomUUID().toString(), partList);
S3MultipartUploadCompleteRequest s3MultipartUploadCompleteRequest =
- new S3MultipartUploadCompleteRequest(completeMultipartRequest);
+ getS3MultipartUploadCompleteReq(completeMultipartRequest);
OMClientResponse omClientResponse =
s3MultipartUploadCompleteRequest.validateAndUpdateCache(ozoneManager,
@@ -180,7 +236,7 @@ public class TestS3MultipartUploadCompleteRequest
// Doing complete multipart upload request with out initiate.
S3MultipartUploadCompleteRequest s3MultipartUploadCompleteRequest =
- new S3MultipartUploadCompleteRequest(completeMultipartRequest);
+ getS3MultipartUploadCompleteReq(completeMultipartRequest);
OMClientResponse omClientResponse =
s3MultipartUploadCompleteRequest.validateAndUpdateCache(ozoneManager,
@@ -191,5 +247,35 @@ public class TestS3MultipartUploadCompleteRequest
omClientResponse.getOMResponse().getStatus());
}
+
+ protected void addKeyToTable(String volumeName, String bucketName,
+ String keyName, long clientID) throws Exception {
+ TestOMRequestUtils.addKeyToTable(true, volumeName, bucketName,
+ keyName, clientID, HddsProtos.ReplicationType.RATIS,
+ HddsProtos.ReplicationFactor.ONE, omMetadataManager);
+ }
+
+ protected String getMultipartKey(String volumeName, String bucketName,
+ String keyName, String multipartUploadID) throws IOException {
+ return omMetadataManager.getMultipartKey(volumeName,
+ bucketName, keyName, multipartUploadID);
+ }
+
+ private String getPartName(String volumeName, String bucketName,
+ String keyName, long clientID) throws IOException {
+
+ String dbOzoneKey = omMetadataManager.getOzoneKey(volumeName, bucketName,
+ keyName);
+ return dbOzoneKey + clientID;
+ }
+
+ protected String getOzoneDBKey(String volumeName, String bucketName,
+ String keyName) throws IOException {
+ return omMetadataManager.getOzoneKey(volumeName, bucketName, keyName);
+ }
+
+ protected String getKeyName() {
+ return UUID.randomUUID().toString();
+ }
}
diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/s3/multipart/TestS3MultipartUploadCompleteRequestV1.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/s3/multipart/TestS3MultipartUploadCompleteRequestV1.java
new file mode 100644
index 0000000..cd5051f
--- /dev/null
+++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/s3/multipart/TestS3MultipartUploadCompleteRequestV1.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.om.request.s3.multipart;
+
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
+import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
+import org.apache.hadoop.ozone.om.helpers.OzoneFSUtils;
+import org.apache.hadoop.ozone.om.helpers.OzoneFileStatus;
+import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerRatisUtils;
+import org.apache.hadoop.ozone.om.request.TestOMRequestUtils;
+import org.apache.hadoop.ozone.om.request.file.OMFileRequest;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
+import org.apache.hadoop.util.Time;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+
+import java.io.IOException;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.Iterator;
+import java.util.UUID;
+
+/**
+ * Tests S3 Multipart Upload Complete request.
+ */
+public class TestS3MultipartUploadCompleteRequestV1
+ extends TestS3MultipartUploadCompleteRequest {
+
+ @BeforeClass
+ public static void init() {
+ OzoneManagerRatisUtils.setBucketFSOptimized(true);
+ }
+
+ protected String getKeyName() {
+ String parentDir = UUID.randomUUID().toString() + "/a/b/c";
+ String fileName = "file1";
+ String keyName = parentDir + OzoneConsts.OM_KEY_PREFIX + fileName;
+ return keyName;
+ }
+
+ protected void addKeyToTable(String volumeName, String bucketName,
+ String keyName, long clientID) throws Exception {
+ // need to initialize parentID
+ String parentDir = OzoneFSUtils.getParentDir(keyName);
+ Assert.assertNotEquals("Parent doesn't exists!", parentDir, keyName);
+
+ // add parentDir to dirTable
+ long parentID = getParentID(volumeName, bucketName, keyName);
+ long txnId = 50;
+ long objectId = parentID + 1;
+
+ OmKeyInfo omKeyInfoV1 =
+ TestOMRequestUtils.createOmKeyInfo(volumeName, bucketName, keyName,
+ HddsProtos.ReplicationType.RATIS,
+ HddsProtos.ReplicationFactor.ONE, objectId, parentID, txnId,
+ Time.now());
+
+ // add key to openFileTable
+ String fileName = OzoneFSUtils.getFileName(keyName);
+ omKeyInfoV1.setKeyName(fileName);
+ TestOMRequestUtils.addFileToKeyTable(true, false,
+ fileName, omKeyInfoV1, clientID, omKeyInfoV1.getObjectID(),
+ omMetadataManager);
+ }
+
+ protected String getMultipartKey(String volumeName, String bucketName,
+ String keyName, String multipartUploadID) throws IOException {
+ OzoneFileStatus keyStatus = OMFileRequest.getOMKeyInfoIfExists(
+ omMetadataManager, volumeName,
+ bucketName, keyName, 0);
+
+ Assert.assertNotNull("key not found in DB!", keyStatus);
+
+ return omMetadataManager.getMultipartKey(keyStatus.getKeyInfo()
+ .getParentObjectID(), keyStatus.getTrimmedName(),
+ multipartUploadID);
+ }
+
+ private long getParentID(String volumeName, String bucketName,
+ String keyName) throws IOException {
+ Path keyPath = Paths.get(keyName);
+ Iterator<Path> elements = keyPath.iterator();
+ String bucketKey = omMetadataManager.getBucketKey(volumeName, bucketName);
+ OmBucketInfo omBucketInfo =
+ omMetadataManager.getBucketTable().get(bucketKey);
+
+ return OMFileRequest.getParentID(omBucketInfo.getObjectID(),
+ elements, keyName, omMetadataManager);
+ }
+
+ protected String getOzoneDBKey(String volumeName, String bucketName,
+ String keyName) throws IOException {
+ long parentID = getParentID(volumeName, bucketName, keyName);
+ String fileName = OzoneFSUtils.getFileName(keyName);
+ return omMetadataManager.getOzonePathKey(parentID, fileName);
+ }
+
+ protected S3MultipartUploadCompleteRequest getS3MultipartUploadCompleteReq(
+ OMRequest omRequest) {
+ return new S3MultipartUploadCompleteRequestV1(omRequest);
+ }
+
+ protected S3MultipartUploadCommitPartRequest getS3MultipartUploadCommitReq(
+ OMRequest omRequest) {
+ return new S3MultipartUploadCommitPartRequestV1(omRequest);
+ }
+
+ protected S3InitiateMultipartUploadRequest getS3InitiateMultipartUploadReq(
+ OMRequest initiateMPURequest) {
+ return new S3InitiateMultipartUploadRequestV1(initiateMPURequest);
+ }
+
+}
+
diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/s3/multipart/TestS3MultipartResponse.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/s3/multipart/TestS3MultipartResponse.java
index 106ae61..6f4d6fa 100644
--- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/s3/multipart/TestS3MultipartResponse.java
+++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/s3/multipart/TestS3MultipartResponse.java
@@ -268,6 +268,27 @@ public class TestS3MultipartResponse {
openPartKeyInfoToBeDeleted, isRatisEnabled, omBucketInfo);
}
+ @SuppressWarnings("checkstyle:ParameterNumber")
+ public S3MultipartUploadCompleteResponse createS3CompleteMPUResponseV1(
+ String volumeName, String bucketName, long parentID, String keyName,
+ String multipartUploadID, OmKeyInfo omKeyInfo,
+ OzoneManagerProtocolProtos.Status status,
+ List<OmKeyInfo> unUsedParts) {
+
+ String multipartKey = getMultipartKey(parentID, keyName, multipartUploadID);
+
+ OMResponse omResponse = OMResponse.newBuilder()
+ .setCmdType(OzoneManagerProtocolProtos.Type.CompleteMultiPartUpload)
+ .setStatus(status).setSuccess(true)
+ .setCompleteMultiPartUploadResponse(
+ OzoneManagerProtocolProtos.MultipartUploadCompleteResponse
+ .newBuilder().setBucket(bucketName)
+ .setVolume(volumeName).setKey(keyName)).build();
+
+ return new S3MultipartUploadCompleteResponseV1(omResponse, multipartKey,
+ omKeyInfo, unUsedParts);
+ }
+
private String getMultipartKey(long parentID, String keyName,
String multipartUploadID) {
String fileName = OzoneFSUtils.getFileName(keyName);
diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/s3/multipart/TestS3MultipartUploadCompleteResponseV1.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/s3/multipart/TestS3MultipartUploadCompleteResponseV1.java
new file mode 100644
index 0000000..2683273
--- /dev/null
+++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/s3/multipart/TestS3MultipartUploadCompleteResponseV1.java
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.om.response.s3.multipart;
+
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
+import org.apache.hadoop.ozone.om.helpers.OmMultipartKeyInfo;
+import org.apache.hadoop.ozone.om.helpers.OzoneFSUtils;
+import org.apache.hadoop.ozone.om.helpers.RepeatedOmKeyInfo;
+import org.apache.hadoop.ozone.om.request.TestOMRequestUtils;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.PartKeyInfo;
+import org.apache.hadoop.util.Time;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+
+/**
+ * Test multipart upload complete response.
+ */
+public class TestS3MultipartUploadCompleteResponseV1
+ extends TestS3MultipartResponse {
+
+ private String dirName = "a/b/c/";
+
+ private long parentID;
+
+ @Test
+ public void testAddDBToBatch() throws Exception {
+ String volumeName = UUID.randomUUID().toString();
+ String bucketName = UUID.randomUUID().toString();
+ String keyName = getKeyName();
+ String multipartUploadID = UUID.randomUUID().toString();
+
+ TestOMRequestUtils.addVolumeAndBucketToDB(volumeName, bucketName,
+ omMetadataManager);
+
+ long txnId = 50;
+ long objectId = parentID + 1;
+ String fileName = OzoneFSUtils.getFileName(keyName);
+ String dbMultipartKey = omMetadataManager.getMultipartKey(parentID,
+ fileName, multipartUploadID);
+ long clientId = Time.now();
+ String dbOpenKey = omMetadataManager.getOpenFileName(parentID, fileName,
+ clientId);
+ String dbKey = omMetadataManager.getOzonePathKey(parentID, fileName);
+ OmKeyInfo omKeyInfoV1 =
+ TestOMRequestUtils.createOmKeyInfo(volumeName, bucketName, keyName,
+ HddsProtos.ReplicationType.RATIS,
+ HddsProtos.ReplicationFactor.ONE, objectId, parentID, txnId,
+ Time.now());
+
+ // add key to openFileTable
+ omKeyInfoV1.setKeyName(fileName);
+ TestOMRequestUtils.addFileToKeyTable(true, false,
+ fileName, omKeyInfoV1, clientId, omKeyInfoV1.getObjectID(),
+ omMetadataManager);
+
+ addS3MultipartUploadCommitPartResponseV1(volumeName, bucketName, keyName,
+ multipartUploadID, dbOpenKey);
+
+ List<OmKeyInfo> unUsedParts = new ArrayList<>();
+ S3MultipartUploadCompleteResponse s3MultipartUploadCompleteResponse =
+ createS3CompleteMPUResponseV1(volumeName, bucketName, parentID,
+ keyName, multipartUploadID, omKeyInfoV1,
+ OzoneManagerProtocolProtos.Status.OK, unUsedParts);
+
+ s3MultipartUploadCompleteResponse.addToDBBatch(omMetadataManager,
+ batchOperation);
+
+ omMetadataManager.getStore().commitBatchOperation(batchOperation);
+
+ Assert.assertNotNull(omMetadataManager.getKeyTable().get(dbKey));
+ Assert.assertNull(
+ omMetadataManager.getMultipartInfoTable().get(dbMultipartKey));
+ Assert.assertNull(
+ omMetadataManager.getOpenKeyTable().get(dbMultipartKey));
+
+ // As no parts are created, so no entries should be there in delete table.
+ Assert.assertEquals(0, omMetadataManager.countRowsInTable(
+ omMetadataManager.getDeletedTable()));
+ }
+
+ @Test
+ public void testAddDBToBatchWithParts() throws Exception {
+
+ String volumeName = UUID.randomUUID().toString();
+ String bucketName = UUID.randomUUID().toString();
+ String keyName = getKeyName();
+
+ TestOMRequestUtils.addVolumeAndBucketToDB(volumeName, bucketName,
+ omMetadataManager);
+ createParentPath(volumeName, bucketName);
+
+ String multipartUploadID = UUID.randomUUID().toString();
+
+ int deleteEntryCount = 0;
+
+ String fileName = OzoneFSUtils.getFileName(keyName);
+ String dbMultipartKey = omMetadataManager.getMultipartKey(parentID,
+ fileName, multipartUploadID);
+
+ S3InitiateMultipartUploadResponse s3InitiateMultipartUploadResponseV1 =
+ addS3InitiateMultipartUpload(volumeName, bucketName, keyName,
+ multipartUploadID);
+
+ // Add some dummy parts for testing.
+ // Not added any key locations, as this just test is to see entries are
+ // adding to delete table or not.
+ OmMultipartKeyInfo omMultipartKeyInfo =
+ s3InitiateMultipartUploadResponseV1.getOmMultipartKeyInfo();
+
+ OmKeyInfo omKeyInfoV1 = commitS3MultipartUpload(volumeName, bucketName,
+ keyName, multipartUploadID, fileName, dbMultipartKey,
+ omMultipartKeyInfo);
+ // After commits, it adds an entry to the deleted table.
+ deleteEntryCount++;
+
+ OmKeyInfo omKeyInfo =
+ TestOMRequestUtils.createOmKeyInfo(volumeName, bucketName, keyName,
+ HddsProtos.ReplicationType.RATIS,
+ HddsProtos.ReplicationFactor.ONE,
+ parentID + 10,
+ parentID, 100, Time.now());
+ List<OmKeyInfo> unUsedParts = new ArrayList<>();
+ unUsedParts.add(omKeyInfo);
+ S3MultipartUploadCompleteResponse s3MultipartUploadCompleteResponse =
+ createS3CompleteMPUResponseV1(volumeName, bucketName, parentID,
+ keyName, multipartUploadID, omKeyInfoV1,
+ OzoneManagerProtocolProtos.Status.OK, unUsedParts);
+
+ s3MultipartUploadCompleteResponse.addToDBBatch(omMetadataManager,
+ batchOperation);
+
+ omMetadataManager.getStore().commitBatchOperation(batchOperation);
+ String dbKey = omMetadataManager.getOzonePathKey(parentID,
+ omKeyInfoV1.getFileName());
+ Assert.assertNotNull(omMetadataManager.getKeyTable().get(dbKey));
+ Assert.assertNull(
+ omMetadataManager.getMultipartInfoTable().get(dbMultipartKey));
+ Assert.assertNull(
+ omMetadataManager.getOpenKeyTable().get(dbMultipartKey));
+
+ // As 1 unused parts exists, so 1 unused entry should be there in delete
+ // table.
+ deleteEntryCount++;
+ Assert.assertEquals(deleteEntryCount, omMetadataManager.countRowsInTable(
+ omMetadataManager.getDeletedTable()));
+ }
+
+ private OmKeyInfo commitS3MultipartUpload(String volumeName,
+ String bucketName, String keyName, String multipartUploadID,
+ String fileName, String multipartKey,
+ OmMultipartKeyInfo omMultipartKeyInfo) throws IOException {
+
+ PartKeyInfo part1 = createPartKeyInfoV1(volumeName, bucketName, parentID,
+ fileName, 1);
+
+ addPart(1, part1, omMultipartKeyInfo);
+
+ long clientId = Time.now();
+ String openKey = omMetadataManager.getOpenFileName(parentID, fileName,
+ clientId);
+
+ S3MultipartUploadCommitPartResponse s3MultipartUploadCommitPartResponse =
+ createS3CommitMPUResponseV1(volumeName, bucketName, parentID,
+ keyName, multipartUploadID,
+ omMultipartKeyInfo.getPartKeyInfo(1),
+ omMultipartKeyInfo,
+ OzoneManagerProtocolProtos.Status.OK, openKey);
+
+ s3MultipartUploadCommitPartResponse.checkAndUpdateDB(omMetadataManager,
+ batchOperation);
+
+ Assert.assertNull(omMetadataManager.getOpenKeyTable().get(multipartKey));
+ Assert.assertNull(
+ omMetadataManager.getMultipartInfoTable().get(multipartKey));
+
+ omMetadataManager.getStore().commitBatchOperation(batchOperation);
+
+ // As 1 parts are created, so 1 entry should be there in delete table.
+ Assert.assertEquals(1, omMetadataManager.countRowsInTable(
+ omMetadataManager.getDeletedTable()));
+
+ String part1DeletedKeyName =
+ omMultipartKeyInfo.getPartKeyInfo(1).getPartName();
+
+ Assert.assertNotNull(omMetadataManager.getDeletedTable().get(
+ part1DeletedKeyName));
+
+ RepeatedOmKeyInfo ro =
+ omMetadataManager.getDeletedTable().get(part1DeletedKeyName);
+ OmKeyInfo omPartKeyInfo = OmKeyInfo.getFromProtobuf(part1.getPartKeyInfo());
+ Assert.assertEquals(omPartKeyInfo, ro.getOmKeyInfoList().get(0));
+
+ return omPartKeyInfo;
+ }
+
+ private S3InitiateMultipartUploadResponse addS3InitiateMultipartUpload(
+ String volumeName, String bucketName, String keyName,
+ String multipartUploadID) throws IOException {
+
+ S3InitiateMultipartUploadResponse s3InitiateMultipartUploadResponseV1 =
+ createS3InitiateMPUResponseV1(volumeName, bucketName, parentID,
+ keyName, multipartUploadID, new ArrayList<>());
+
+ s3InitiateMultipartUploadResponseV1.addToDBBatch(omMetadataManager,
+ batchOperation);
+
+ return s3InitiateMultipartUploadResponseV1;
+ }
+
+ private String getKeyName() {
+ return dirName + UUID.randomUUID().toString();
+ }
+
+ private void createParentPath(String volumeName, String bucketName)
+ throws Exception {
+ // Create parent dirs for the path
+ parentID = TestOMRequestUtils.addParentsToDirTable(volumeName, bucketName,
+ dirName, omMetadataManager);
+ }
+
+ private void addS3MultipartUploadCommitPartResponseV1(String volumeName,
+ String bucketName, String keyName, String multipartUploadID,
+ String openKey) throws IOException {
+ S3MultipartUploadCommitPartResponse s3MultipartUploadCommitPartResponse =
+ createS3CommitMPUResponseV1(volumeName, bucketName, parentID,
+ keyName, multipartUploadID, null, null,
+ OzoneManagerProtocolProtos.Status.OK, openKey);
+
+ s3MultipartUploadCommitPartResponse.addToDBBatch(omMetadataManager,
+ batchOperation);
+
+ omMetadataManager.getStore().commitBatchOperation(batchOperation);
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org