You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by ca...@apache.org on 2022/10/31 14:02:09 UTC

[ozone] branch HDDS-4454 updated: HDDS-7438. Add a createStreamKey method to OzoneBucket. (#3914)

This is an automated email from the ASF dual-hosted git repository.

captainzmc pushed a commit to branch HDDS-4454
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/HDDS-4454 by this push:
     new eedd983155 HDDS-7438. Add a createStreamKey method to OzoneBucket. (#3914)
eedd983155 is described below

commit eedd983155c62e111ffee2d685840c27b260770f
Author: Tsz-Wo Nicholas Sze <sz...@apache.org>
AuthorDate: Mon Oct 31 22:02:02 2022 +0800

    HDDS-7438. Add a createStreamKey method to OzoneBucket. (#3914)
    
    * HDDS-7438. Add a createStreamKey method to OzoneBucket.
---
 .../apache/hadoop/ozone/client/OzoneBucket.java    | 32 ++++++---
 .../client/rpc/TestOzoneRpcClientWithRatis.java    | 76 ++++++++++++++++++++++
 2 files changed, 100 insertions(+), 8 deletions(-)

diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneBucket.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneBucket.java
index d171a43f8a..b0a8e965c4 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneBucket.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneBucket.java
@@ -53,6 +53,7 @@ import org.apache.hadoop.util.Time;
 import java.io.IOException;
 import java.time.Instant;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
@@ -600,6 +601,21 @@ public class OzoneBucket extends WithMetadata {
         .createKey(volumeName, name, key, size, replicationConfig, keyMetadata);
   }
 
+  /**
+   * Creates a new key in the bucket, with default replication type RATIS and
+   * with replication factor THREE.
+   *
+   * @param key  Name of the key to be created.
+   * @param size Size of the data the key will point to.
+   * @return OzoneOutputStream to which the data has to be written.
+   * @throws IOException
+   */
+  public OzoneDataStreamOutput createStreamKey(String key, long size)
+      throws IOException {
+    return createStreamKey(key, size, defaultReplication,
+        Collections.emptyMap());
+  }
+
   /**
    * Creates a new key in the bucket.
    *
@@ -610,12 +626,13 @@ public class OzoneBucket extends WithMetadata {
    * @throws IOException
    */
   public OzoneDataStreamOutput createStreamKey(String key, long size,
-      ReplicationConfig replicationConfig,
-      Map<String, String> keyMetadata)
+      ReplicationConfig replicationConfig, Map<String, String> keyMetadata)
       throws IOException {
-    return proxy
-        .createStreamKey(volumeName, name, key, size, replicationConfig,
-            keyMetadata);
+    if (replicationConfig == null) {
+      replicationConfig = defaultReplication;
+    }
+    return proxy.createStreamKey(volumeName, name, key, size,
+        replicationConfig, keyMetadata);
   }
 
   /**
@@ -958,9 +975,8 @@ public class OzoneBucket extends WithMetadata {
   public OzoneDataStreamOutput createStreamFile(String keyName, long size,
       ReplicationConfig replicationConfig, boolean overWrite,
       boolean recursive) throws IOException {
-    return proxy
-        .createStreamFile(volumeName, name, keyName, size, replicationConfig,
-            overWrite, recursive);
+    return proxy.createStreamFile(volumeName, name, keyName, size,
+        replicationConfig, overWrite, recursive);
   }
 
   /**
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientWithRatis.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientWithRatis.java
index 65551bc4e9..ca26ca177c 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientWithRatis.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientWithRatis.java
@@ -18,21 +18,29 @@
 
 package org.apache.hadoop.ozone.client.rpc;
 
+import java.io.File;
+import java.io.FileOutputStream;
 import java.io.IOException;
+import java.io.RandomAccessFile;
 import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.UUID;
+import java.util.concurrent.ThreadLocalRandom;
 
+import org.apache.hadoop.hdds.client.DefaultReplicationConfig;
 import org.apache.hadoop.hdds.client.ReplicationConfig;
 import org.apache.hadoop.hdds.client.ReplicationType;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.scm.ScmConfigKeys;
 import org.apache.hadoop.ozone.OzoneConfigKeys;
+import org.apache.hadoop.ozone.client.BucketArgs;
 import org.apache.hadoop.ozone.client.ObjectStore;
 import org.apache.hadoop.ozone.client.OzoneBucket;
 import org.apache.hadoop.ozone.client.OzoneClient;
 import org.apache.hadoop.ozone.client.OzoneClientFactory;
+import org.apache.hadoop.ozone.client.OzoneKeyDetails;
 import org.apache.hadoop.ozone.client.OzoneMultipartUploadPartListParts;
 import org.apache.hadoop.ozone.client.OzoneVolume;
 import org.apache.hadoop.ozone.client.io.OzoneDataStreamOutput;
@@ -42,8 +50,10 @@ import org.apache.hadoop.ozone.common.OzoneChecksumException;
 import org.apache.hadoop.ozone.om.OMConfigKeys;
 import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
 import org.apache.hadoop.ozone.om.helpers.OmMultipartInfo;
+import org.apache.ozone.test.GenericTestUtils;
 import org.junit.jupiter.api.AfterAll;
 import org.junit.Assert;
+import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
 
@@ -208,4 +218,70 @@ public class TestOzoneRpcClientWithRatis extends TestOzoneRpcClientAbstract {
     Assert.assertEquals(valueLength, partInfo.getSize());
 
   }
+
+  @Test
+  public void testUploadWithStreamAndMemoryMappedBuffer() throws IOException {
+    // create a local dir
+    final String dir = GenericTestUtils.getTempPath(
+        getClass().getSimpleName());
+    GenericTestUtils.assertDirCreation(new File(dir));
+
+    // create a local file
+    final int chunkSize = 1024;
+    final byte[] data = new byte[8 * chunkSize];
+    ThreadLocalRandom.current().nextBytes(data);
+    final File file = new File(dir, "data");
+    try (FileOutputStream out = new FileOutputStream(file)) {
+      out.write(data);
+    }
+
+    // create a volume
+    final String volumeName = "vol-" + UUID.randomUUID();
+    getStore().createVolume(volumeName);
+    final OzoneVolume volume = getStore().getVolume(volumeName);
+
+    // create a bucket
+    final String bucketName = "buck-" + UUID.randomUUID();
+    final BucketArgs bucketArgs = BucketArgs.newBuilder()
+        .setDefaultReplicationConfig(
+            new DefaultReplicationConfig(ReplicationType.RATIS, THREE))
+        .build();
+    volume.createBucket(bucketName, bucketArgs);
+    final OzoneBucket bucket = volume.getBucket(bucketName);
+
+    // upload a key from the local file using memory-mapped buffers
+    final String keyName = "key-" + UUID.randomUUID();
+    try (RandomAccessFile raf = new RandomAccessFile(file, "r");
+         OzoneDataStreamOutput out = bucket.createStreamKey(
+             keyName, data.length)) {
+      final FileChannel channel = raf.getChannel();
+      long off = 0;
+      for (long len = raf.length(); len > 0;) {
+        final long writeLen = Math.min(len, chunkSize);
+        final ByteBuffer mapped = channel.map(FileChannel.MapMode.READ_ONLY,
+            off, writeLen);
+        out.write(mapped);
+        off += writeLen;
+        len -= writeLen;
+      }
+    }
+
+    // verify the key details
+    final OzoneKeyDetails keyDetails = bucket.getKey(keyName);
+    Assertions.assertEquals(keyName, keyDetails.getName());
+    Assertions.assertEquals(data.length, keyDetails.getDataSize());
+
+    // verify the key content
+    final byte[] buffer = new byte[data.length];
+    try (OzoneInputStream in = keyDetails.getContent()) {
+      for (int off = 0; off < data.length;) {
+        final int n = in.read(buffer, off, data.length - off);
+        if (n < 0) {
+          break;
+        }
+        off += n;
+      }
+    }
+    Assertions.assertArrayEquals(data, buffer);
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org