You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by sz...@apache.org on 2022/11/07 18:12:33 UTC
[ozone] 40/40: HDDS-7438. [Ozone-Streaming] Add a createStreamKey method to OzoneBucket. (#3914)
This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch HDDS-4454
in repository https://gitbox.apache.org/repos/asf/ozone.git
commit cfdf9cd171c0b1490598753453d930e477ba2075
Author: Tsz-Wo Nicholas Sze <sz...@apache.org>
AuthorDate: Mon Oct 31 22:02:02 2022 +0800
HDDS-7438. [Ozone-Streaming] Add a createStreamKey method to OzoneBucket. (#3914)
(cherry picked from commit eedd983155c62e111ffee2d685840c27b260770f)
---
.../apache/hadoop/ozone/client/OzoneBucket.java | 32 ++++++---
.../client/rpc/TestOzoneRpcClientWithRatis.java | 76 ++++++++++++++++++++++
2 files changed, 100 insertions(+), 8 deletions(-)
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneBucket.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneBucket.java
index d171a43f8a..b0a8e965c4 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneBucket.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneBucket.java
@@ -53,6 +53,7 @@ import org.apache.hadoop.util.Time;
import java.io.IOException;
import java.time.Instant;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
@@ -600,6 +601,21 @@ public class OzoneBucket extends WithMetadata {
.createKey(volumeName, name, key, size, replicationConfig, keyMetadata);
}
+ /**
+ * Creates a new key in the bucket, with default replication type RATIS and
+ * with replication factor THREE.
+ *
+ * @param key Name of the key to be created.
+ * @param size Size of the data the key will point to.
+ * @return OzoneOutputStream to which the data has to be written.
+ * @throws IOException
+ */
+ public OzoneDataStreamOutput createStreamKey(String key, long size)
+ throws IOException {
+ return createStreamKey(key, size, defaultReplication,
+ Collections.emptyMap());
+ }
+
/**
* Creates a new key in the bucket.
*
@@ -610,12 +626,13 @@ public class OzoneBucket extends WithMetadata {
* @throws IOException
*/
public OzoneDataStreamOutput createStreamKey(String key, long size,
- ReplicationConfig replicationConfig,
- Map<String, String> keyMetadata)
+ ReplicationConfig replicationConfig, Map<String, String> keyMetadata)
throws IOException {
- return proxy
- .createStreamKey(volumeName, name, key, size, replicationConfig,
- keyMetadata);
+ if (replicationConfig == null) {
+ replicationConfig = defaultReplication;
+ }
+ return proxy.createStreamKey(volumeName, name, key, size,
+ replicationConfig, keyMetadata);
}
/**
@@ -958,9 +975,8 @@ public class OzoneBucket extends WithMetadata {
public OzoneDataStreamOutput createStreamFile(String keyName, long size,
ReplicationConfig replicationConfig, boolean overWrite,
boolean recursive) throws IOException {
- return proxy
- .createStreamFile(volumeName, name, keyName, size, replicationConfig,
- overWrite, recursive);
+ return proxy.createStreamFile(volumeName, name, keyName, size,
+ replicationConfig, overWrite, recursive);
}
/**
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientWithRatis.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientWithRatis.java
index 65551bc4e9..ca26ca177c 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientWithRatis.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientWithRatis.java
@@ -18,21 +18,29 @@
package org.apache.hadoop.ozone.client.rpc;
+import java.io.File;
+import java.io.FileOutputStream;
import java.io.IOException;
+import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
import java.util.Arrays;
import java.util.HashMap;
import java.util.UUID;
+import java.util.concurrent.ThreadLocalRandom;
+import org.apache.hadoop.hdds.client.DefaultReplicationConfig;
import org.apache.hadoop.hdds.client.ReplicationConfig;
import org.apache.hadoop.hdds.client.ReplicationType;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.ozone.OzoneConfigKeys;
+import org.apache.hadoop.ozone.client.BucketArgs;
import org.apache.hadoop.ozone.client.ObjectStore;
import org.apache.hadoop.ozone.client.OzoneBucket;
import org.apache.hadoop.ozone.client.OzoneClient;
import org.apache.hadoop.ozone.client.OzoneClientFactory;
+import org.apache.hadoop.ozone.client.OzoneKeyDetails;
import org.apache.hadoop.ozone.client.OzoneMultipartUploadPartListParts;
import org.apache.hadoop.ozone.client.OzoneVolume;
import org.apache.hadoop.ozone.client.io.OzoneDataStreamOutput;
@@ -42,8 +50,10 @@ import org.apache.hadoop.ozone.common.OzoneChecksumException;
import org.apache.hadoop.ozone.om.OMConfigKeys;
import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
import org.apache.hadoop.ozone.om.helpers.OmMultipartInfo;
+import org.apache.ozone.test.GenericTestUtils;
import org.junit.jupiter.api.AfterAll;
import org.junit.Assert;
+import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
@@ -208,4 +218,70 @@ public class TestOzoneRpcClientWithRatis extends TestOzoneRpcClientAbstract {
Assert.assertEquals(valueLength, partInfo.getSize());
}
+
+ @Test
+ public void testUploadWithStreamAndMemoryMappedBuffer() throws IOException {
+ // create a local dir
+ final String dir = GenericTestUtils.getTempPath(
+ getClass().getSimpleName());
+ GenericTestUtils.assertDirCreation(new File(dir));
+
+ // create a local file
+ final int chunkSize = 1024;
+ final byte[] data = new byte[8 * chunkSize];
+ ThreadLocalRandom.current().nextBytes(data);
+ final File file = new File(dir, "data");
+ try (FileOutputStream out = new FileOutputStream(file)) {
+ out.write(data);
+ }
+
+ // create a volume
+ final String volumeName = "vol-" + UUID.randomUUID();
+ getStore().createVolume(volumeName);
+ final OzoneVolume volume = getStore().getVolume(volumeName);
+
+ // create a bucket
+ final String bucketName = "buck-" + UUID.randomUUID();
+ final BucketArgs bucketArgs = BucketArgs.newBuilder()
+ .setDefaultReplicationConfig(
+ new DefaultReplicationConfig(ReplicationType.RATIS, THREE))
+ .build();
+ volume.createBucket(bucketName, bucketArgs);
+ final OzoneBucket bucket = volume.getBucket(bucketName);
+
+ // upload a key from the local file using memory-mapped buffers
+ final String keyName = "key-" + UUID.randomUUID();
+ try (RandomAccessFile raf = new RandomAccessFile(file, "r");
+ OzoneDataStreamOutput out = bucket.createStreamKey(
+ keyName, data.length)) {
+ final FileChannel channel = raf.getChannel();
+ long off = 0;
+ for (long len = raf.length(); len > 0;) {
+ final long writeLen = Math.min(len, chunkSize);
+ final ByteBuffer mapped = channel.map(FileChannel.MapMode.READ_ONLY,
+ off, writeLen);
+ out.write(mapped);
+ off += writeLen;
+ len -= writeLen;
+ }
+ }
+
+ // verify the key details
+ final OzoneKeyDetails keyDetails = bucket.getKey(keyName);
+ Assertions.assertEquals(keyName, keyDetails.getName());
+ Assertions.assertEquals(data.length, keyDetails.getDataSize());
+
+ // verify the key content
+ final byte[] buffer = new byte[data.length];
+ try (OzoneInputStream in = keyDetails.getContent()) {
+ for (int off = 0; off < data.length;) {
+ final int n = in.read(buffer, off, data.length - off);
+ if (n < 0) {
+ break;
+ }
+ off += n;
+ }
+ }
+ Assertions.assertArrayEquals(data, buffer);
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org