You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by ca...@apache.org on 2022/07/04 12:54:06 UTC
[ozone] 16/38: HDDS-5879. [Ozone-Streaming] OzoneBucket add the createMultipartStreamKey method (#2760)
This is an automated email from the ASF dual-hosted git repository.
captainzmc pushed a commit to branch HDDS-4454
in repository https://gitbox.apache.org/repos/asf/ozone.git
commit be9641f9fa281828621d5137fbbd7a1384096301
Author: hao guo <gu...@360.cn>
AuthorDate: Fri Nov 19 11:21:55 2021 +0800
HDDS-5879. [Ozone-Streaming] OzoneBucket add the createMultipartStreamKey method (#2760)
---
.../apache/hadoop/ozone/client/OzoneBucket.java | 15 +++++
.../ozone/client/protocol/ClientProtocol.java | 18 ++++++
.../apache/hadoop/ozone/client/rpc/RpcClient.java | 64 ++++++++++++++++++++++
.../client/rpc/TestOzoneRpcClientWithRatis.java | 53 ++++++++++++++++++
4 files changed, 150 insertions(+)
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneBucket.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneBucket.java
index a11c2dbac5..68ea6304ee 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneBucket.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneBucket.java
@@ -810,6 +810,21 @@ public class OzoneBucket extends WithMetadata {
uploadID);
}
+ /**
+ * Create a part key for a multipart upload key.
+ * @param key
+ * @param size
+ * @param partNumber
+ * @param uploadID
+ * @return OzoneDataStreamOutput
+ * @throws IOException
+ */
+ public OzoneDataStreamOutput createMultipartStreamKey(String key,
+ long size, int partNumber, String uploadID) throws IOException {
+ return proxy.createMultipartStreamKey(volumeName, name,
+ key, size, partNumber, uploadID);
+ }
+
/**
* Complete Multipart upload. This will combine all the parts and make the
* key visible in ozone.
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/protocol/ClientProtocol.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/protocol/ClientProtocol.java
index 8f7f385628..7660fc0e46 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/protocol/ClientProtocol.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/protocol/ClientProtocol.java
@@ -494,6 +494,24 @@ public interface ClientProtocol {
int partNumber, String uploadID)
throws IOException;
+ /**
+ * Create a part key for a multipart upload key.
+ * @param volumeName
+ * @param bucketName
+ * @param keyName
+ * @param size
+ * @param partNumber
+ * @param uploadID
+ * @return OzoneDataStreamOutput
+ * @throws IOException
+ */
+ OzoneDataStreamOutput createMultipartStreamKey(String volumeName,
+ String bucketName,
+ String keyName, long size,
+ int partNumber,
+ String uploadID)
+ throws IOException;
+
/**
* Complete Multipart upload. This will combine all the parts and make the
* key visible in ozone.
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
index c52352bdca..62b9a868b3 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
@@ -1570,6 +1570,70 @@ public class RpcClient implements ClientProtocol {
}
}
+ @Override
+ public OzoneDataStreamOutput createMultipartStreamKey(
+ String volumeName,
+ String bucketName,
+ String keyName,
+ long size,
+ int partNumber,
+ String uploadID)
+ throws IOException {
+ verifyVolumeName(volumeName);
+ verifyBucketName(bucketName);
+ if (checkKeyNameEnabled) {
+ HddsClientUtils.verifyKeyName(keyName);
+ }
+ HddsClientUtils.checkNotNull(keyName, uploadID);
+ Preconditions.checkArgument(partNumber > 0 && partNumber <= 10000, "Part " +
+ "number should be greater than zero and less than or equal to 10000");
+ Preconditions.checkArgument(size >= 0, "size should be greater than or " +
+ "equal to zero");
+ String requestId = UUID.randomUUID().toString();
+
+ OmKeyArgs keyArgs = new OmKeyArgs.Builder()
+ .setVolumeName(volumeName)
+ .setBucketName(bucketName)
+ .setKeyName(keyName)
+ .setDataSize(size)
+ .setIsMultipartKey(true)
+ .setMultipartUploadID(uploadID)
+ .setMultipartUploadPartNumber(partNumber)
+ .setAcls(getAclList())
+ .build();
+
+ OpenKeySession openKey = ozoneManagerClient.openKey(keyArgs);
+
+ KeyDataStreamOutput keyOutputStream =
+ new KeyDataStreamOutput.Builder()
+ .setHandler(openKey)
+ .setXceiverClientManager(xceiverClientManager)
+ .setOmClient(ozoneManagerClient)
+ .setRequestID(requestId)
+ .setReplicationConfig(openKey.getKeyInfo().getReplicationConfig())
+ .setMultipartNumber(partNumber)
+ .setMultipartUploadID(uploadID)
+ .setIsMultipartKey(true)
+ .enableUnsafeByteBufferConversion(unsafeByteBufferConversion)
+ .setConfig(clientConfig)
+ .build();
+ keyOutputStream
+ .addPreallocateBlocks(
+ openKey.getKeyInfo().getLatestVersionLocations(),
+ openKey.getOpenVersion());
+
+ FileEncryptionInfo feInfo = openKey.getKeyInfo().getFileEncryptionInfo();
+ if (feInfo != null) {
+ // todo: need to support file encrypt,
+ // https://issues.apache.org/jira/browse/HDDS-5892
+ throw new UnsupportedOperationException(
+ "FileEncryptionInfo is not yet supported in " +
+ "createMultipartStreamKey");
+ } else {
+ return new OzoneDataStreamOutput(keyOutputStream);
+ }
+ }
+
@Override
public OmMultipartUploadCompleteInfo completeMultipartUpload(
String volumeName, String bucketName, String keyName, String uploadID,
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientWithRatis.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientWithRatis.java
index 362a218af2..d9c67609e6 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientWithRatis.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientWithRatis.java
@@ -19,10 +19,12 @@
package org.apache.hadoop.ozone.client.rpc;
import java.io.IOException;
+import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.HashMap;
import java.util.UUID;
+import org.apache.hadoop.hdds.client.ReplicationConfig;
import org.apache.hadoop.hdds.client.ReplicationType;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
@@ -31,12 +33,15 @@ import org.apache.hadoop.ozone.client.ObjectStore;
import org.apache.hadoop.ozone.client.OzoneBucket;
import org.apache.hadoop.ozone.client.OzoneClient;
import org.apache.hadoop.ozone.client.OzoneClientFactory;
+import org.apache.hadoop.ozone.client.OzoneMultipartUploadPartListParts;
import org.apache.hadoop.ozone.client.OzoneVolume;
+import org.apache.hadoop.ozone.client.io.OzoneDataStreamOutput;
import org.apache.hadoop.ozone.client.io.OzoneInputStream;
import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
import org.apache.hadoop.ozone.common.OzoneChecksumException;
import org.apache.hadoop.ozone.om.OMConfigKeys;
import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
+import org.apache.hadoop.ozone.om.helpers.OmMultipartInfo;
import org.junit.jupiter.api.AfterAll;
import org.junit.Assert;
import org.junit.jupiter.api.BeforeAll;
@@ -44,6 +49,7 @@ import org.junit.jupiter.api.Test;
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.apache.hadoop.hdds.client.ReplicationFactor.THREE;
+import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.fail;
/**
@@ -155,4 +161,51 @@ public class TestOzoneRpcClientWithRatis extends TestOzoneRpcClientAbstract {
}
}
}
+
+ @Test
+ public void testMultiPartUploadWithStream() throws IOException {
+ String volumeName = UUID.randomUUID().toString();
+ String bucketName = UUID.randomUUID().toString();
+ String keyName = UUID.randomUUID().toString();
+
+ byte[] sampleData = new byte[1024 * 8];
+
+ int valueLength = sampleData.length;
+
+ getStore().createVolume(volumeName);
+ OzoneVolume volume = getStore().getVolume(volumeName);
+ volume.createBucket(bucketName);
+ OzoneBucket bucket = volume.getBucket(bucketName);
+
+ ReplicationConfig replicationConfig =
+ ReplicationConfig.fromTypeAndFactor(
+ ReplicationType.RATIS,
+ THREE);
+
+ OmMultipartInfo multipartInfo = bucket.initiateMultipartUpload(keyName,
+ replicationConfig);
+
+ assertNotNull(multipartInfo);
+ String uploadID = multipartInfo.getUploadID();
+ Assert.assertEquals(volumeName, multipartInfo.getVolumeName());
+ Assert.assertEquals(bucketName, multipartInfo.getBucketName());
+ Assert.assertEquals(keyName, multipartInfo.getKeyName());
+ assertNotNull(multipartInfo.getUploadID());
+
+ OzoneDataStreamOutput ozoneStreamOutput = bucket.createMultipartStreamKey(
+ keyName, valueLength, 1, uploadID);
+ ozoneStreamOutput.write(ByteBuffer.wrap(sampleData), 0,
+ valueLength);
+ ozoneStreamOutput.close();
+
+ OzoneMultipartUploadPartListParts parts =
+ bucket.listParts(keyName, uploadID, 0, 1);
+
+ Assert.assertEquals(parts.getPartInfoList().size(), 1);
+
+ OzoneMultipartUploadPartListParts.PartInfo partInfo =
+ parts.getPartInfoList().get(0);
+ Assert.assertEquals(valueLength, partInfo.getSize());
+
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org