You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by bh...@apache.org on 2020/04/29 05:57:11 UTC

[hadoop-ozone] branch master updated: HDDS-3223. Improve s3g read 1GB object efficiency by 100 times (#843)

This is an automated email from the ASF dual-hosted git repository.

bharat pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hadoop-ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new 5fbb045  HDDS-3223. Improve s3g read 1GB object efficiency by 100 times (#843)
5fbb045 is described below

commit 5fbb045dc991776bffd98f7cf5804d7ed14c8b53
Author: runzhiwang <51...@users.noreply.github.com>
AuthorDate: Wed Apr 29 13:56:58 2020 +0800

    HDDS-3223. Improve s3g read 1GB object efficiency by 100 times (#843)
---
 .../hadoop/hdds/scm/XceiverClientManager.java      |  9 ++++
 .../hadoop/hdds/scm/XceiverClientMetrics.java      |  9 ++++
 .../hadoop/ozone/client/io/KeyInputStream.java     | 61 +++++++++++++++++++++
 .../ozone/client/rpc/TestKeyInputStream.java       | 62 ++++++++++++++++++++++
 .../hadoop/ozone/s3/endpoint/ObjectEndpoint.java   | 15 ++++--
 .../hadoop/ozone/s3/io/S3WrapperInputStream.java   | 31 +++++++++++
 .../s3/endpoint/TestMultipartUploadWithCopy.java   | 10 +---
 7 files changed, 183 insertions(+), 14 deletions(-)

diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientManager.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientManager.java
index 0cfaca7..2eb2f9e 100644
--- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientManager.java
+++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientManager.java
@@ -294,6 +294,15 @@ public class XceiverClientManager implements Closeable {
   }
 
   /**
+   * Reset xceiver client metric.
+   */
+  public static synchronized void resetXceiverClientMetrics() {
+    if (metrics != null) {
+      metrics.reset();
+    }
+  }
+
+  /**
    * Configuration for HDDS client.
    */
   @ConfigGroup(prefix = "scm.container.client")
diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientMetrics.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientMetrics.java
index 7ca89ec..5307646 100644
--- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientMetrics.java
+++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientMetrics.java
@@ -45,6 +45,10 @@ public class XceiverClientMetrics {
   private MetricsRegistry registry;
 
   public XceiverClientMetrics() {
+    init();
+  }
+
+  public void init() {
     int numEnumEntries = ContainerProtos.Type.values().length;
     this.registry = new MetricsRegistry(SOURCE_NAME);
 
@@ -106,6 +110,11 @@ public class XceiverClientMetrics {
     return opsArray[type.ordinal()].value();
   }
 
+  @VisibleForTesting
+  public void reset() {
+    init();
+  }
+
   public void unRegister() {
     MetricsSystem ms = DefaultMetricsSystem.instance();
     ms.unregisterSource(SOURCE_NAME);
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyInputStream.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyInputStream.java
index 8e375bf..4af6838 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyInputStream.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyInputStream.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.ozone.client.io;
 import com.google.common.annotations.VisibleForTesting;
 
 import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.io.IOUtils;
 import org.apache.hadoop.fs.FSExceptionMessages;
 import org.apache.hadoop.fs.Seekable;
 import org.apache.hadoop.hdds.client.BlockID;
@@ -34,6 +35,7 @@ import org.slf4j.LoggerFactory;
 import java.io.EOFException;
 import java.io.IOException;
 import java.io.InputStream;
+import java.io.OutputStream;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
@@ -322,4 +324,63 @@ public class KeyInputStream extends InputStream implements Seekable {
   public long getRemainingOfIndex(int index) throws IOException {
     return blockStreams.get(index).getRemaining();
   }
+
+  /**
+   * Copies some or all bytes from a large (over 2GB) <code>InputStream</code>
+   * to an <code>OutputStream</code>, optionally skipping input bytes.
+   * <p>
+   * Copy the method from IOUtils of commons-io to reimplement skip by seek
+   * rather than read. The reason why IOUtils of commons-io implement skip
+   * by read can be found at
+   * <a href="https://issues.apache.org/jira/browse/IO-203">IO-203</a>.
+   * </p>
+   * <p>
+   * This method uses the provided buffer, so there is no need to use a
+   * <code>BufferedInputStream</code>.
+   * </p>
+   *
+   * @param output the <code>OutputStream</code> to write to
+   * @param inputOffset : number of bytes to skip from input before copying
+   * -ve values are ignored
+   * @param length : number of bytes to copy. -ve means all
+   * @param buffer the buffer to use for the copy
+   * @return the number of bytes copied
+   * @throws NullPointerException if the input or output is null
+   * @throws IOException          if an I/O error occurs
+   */
+  public long copyLarge(final OutputStream output,
+      final long inputOffset, final long len, final byte[] buffer)
+      throws IOException {
+    if (inputOffset > 0) {
+      seek(inputOffset);
+    }
+
+    if (len == 0) {
+      return 0;
+    }
+
+    final int bufferLength = buffer.length;
+    int bytesToRead = bufferLength;
+    if (len > 0 && len < bufferLength) {
+      bytesToRead = (int) len;
+    }
+
+    int read;
+    long totalRead = 0;
+    while (bytesToRead > 0) {
+      read = read(buffer, 0, bytesToRead);
+      if (read == IOUtils.EOF) {
+        break;
+      }
+
+      output.write(buffer, 0, read);
+      totalRead += read;
+      if (len > 0) { // only adjust len if not reading to the end
+        // Note the cast must work because buffer.length is an integer
+        bytesToRead = (int) Math.min(len - totalRead, bufferLength);
+      }
+    }
+
+    return totalRead;
+  }
 }
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestKeyInputStream.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestKeyInputStream.java
index 2779e7f..75b6055 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestKeyInputStream.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestKeyInputStream.java
@@ -37,6 +37,7 @@ import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
+import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.util.Random;
 import java.util.UUID;
@@ -223,6 +224,7 @@ public class TestKeyInputStream {
 
   @Test
   public void testSeek() throws Exception {
+    XceiverClientManager.resetXceiverClientMetrics();
     XceiverClientMetrics metrics = XceiverClientManager
         .getXceiverClientMetrics();
     long writeChunkCount = metrics.getContainerOpCountMetrics(
@@ -273,4 +275,64 @@ public class TestKeyInputStream {
       Assert.assertEquals(inputData[chunkSize + 50 + i], readData[i]);
     }
   }
+
+  @Test
+  public void testCopyLarge() throws Exception {
+    String keyName = getKeyName();
+    OzoneOutputStream key = TestHelper.createKey(keyName,
+        ReplicationType.RATIS, 0, objectStore, volumeName, bucketName);
+
+    // write data spanning 3 blocks
+    int dataLength = (2 * blockSize) + (blockSize / 2);
+
+    byte[] inputData = new byte[dataLength];
+    Random rand = new Random();
+    for (int i = 0; i < dataLength; i++) {
+      inputData[i] = (byte) rand.nextInt(127);
+    }
+    key.write(inputData);
+    key.close();
+
+    // test with random start and random length
+    for (int i = 0; i < 100; i++) {
+      int inputOffset = rand.nextInt(dataLength - 1);
+      int length = rand.nextInt(dataLength - inputOffset);
+
+      KeyInputStream keyInputStream = (KeyInputStream) objectStore
+          .getVolume(volumeName).getBucket(bucketName).readKey(keyName)
+          .getInputStream();
+      ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+
+      keyInputStream.copyLarge(outputStream, inputOffset, length,
+          new byte[4096]);
+      byte[] readData = outputStream.toByteArray();
+      keyInputStream.close();
+      outputStream.close();
+
+      for (int j = inputOffset; j < inputOffset + length; j++) {
+        Assert.assertEquals(readData[j - inputOffset], inputData[j]);
+      }
+    }
+
+    // test with random start and -ve length
+    for (int i = 0; i < 10; i++) {
+      int inputOffset = rand.nextInt(dataLength - 1);
+      int length = -1;
+
+      KeyInputStream keyInputStream = (KeyInputStream) objectStore
+          .getVolume(volumeName).getBucket(bucketName).readKey(keyName)
+          .getInputStream();
+      ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+
+      keyInputStream.copyLarge(outputStream, inputOffset, length,
+          new byte[4096]);
+      byte[] readData = outputStream.toByteArray();
+      keyInputStream.close();
+      outputStream.close();
+
+      for (int j = inputOffset; j < dataLength; j++) {
+        Assert.assertEquals(readData[j - inputOffset], inputData[j]);
+      }
+    }
+  }
 }
diff --git a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java
index 455e734..f695fcb 100644
--- a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java
+++ b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java
@@ -259,8 +259,7 @@ public class ObjectEndpoint extends EndpointBase {
           try (S3WrapperInputStream s3WrapperInputStream =
               new S3WrapperInputStream(
                   key.getInputStream())) {
-            IOUtils.copyLarge(s3WrapperInputStream, dest, startOffset,
-                copyLength);
+            s3WrapperInputStream.copyLarge(dest, startOffset, copyLength);
           }
         };
         responseBuilder = Response
@@ -534,10 +533,16 @@ public class ObjectEndpoint extends EndpointBase {
             if (range != null) {
               RangeHeader rangeHeader =
                   RangeHeaderParserUtil.parseRangeHeader(range, 0);
-              IOUtils.copyLarge(sourceObject, ozoneOutputStream,
-                  rangeHeader.getStartOffset(),
-                  rangeHeader.getEndOffset() - rangeHeader.getStartOffset());
 
+              long copyLength = rangeHeader.getEndOffset() -
+                  rangeHeader.getStartOffset();
+
+              try (S3WrapperInputStream s3WrapperInputStream =
+                  new S3WrapperInputStream(
+                  sourceObject.getInputStream())) {
+                s3WrapperInputStream.copyLarge(ozoneOutputStream,
+                    rangeHeader.getStartOffset(), copyLength);
+              }
             } else {
               IOUtils.copy(sourceObject, ozoneOutputStream);
             }
diff --git a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/io/S3WrapperInputStream.java b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/io/S3WrapperInputStream.java
index 9efcc87..edf90ed 100644
--- a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/io/S3WrapperInputStream.java
+++ b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/io/S3WrapperInputStream.java
@@ -23,12 +23,14 @@ import org.apache.hadoop.ozone.client.io.KeyInputStream;
 
 import java.io.IOException;
 import java.io.InputStream;
+import java.io.OutputStream;
 
 /**
  * S3Wrapper Input Stream which encapsulates KeyInputStream from ozone.
  */
 public class S3WrapperInputStream extends FSInputStream {
   private final KeyInputStream inputStream;
+  private static final int DEFAULT_BUFFER_SIZE = 32 * 1024;
 
   /**
    * Constructs S3WrapperInputStream with KeyInputStream.
@@ -76,4 +78,33 @@ public class S3WrapperInputStream extends FSInputStream {
   public boolean seekToNewSource(long targetPos) throws IOException {
     return false;
   }
+
+  /**
+   * Copies some or all bytes from a large (over 2GB) <code>InputStream</code>
+   * to an <code>OutputStream</code>, optionally skipping input bytes.
+   * <p>
+   * Copy the method from IOUtils of commons-io to reimplement skip by seek
+   * rather than read. The reason why IOUtils of commons-io implement skip
+   * by read can be found at
+   * <a href="https://issues.apache.org/jira/browse/IO-203">IO-203</a>.
+   * </p>
+   * <p>
+   * This method buffers the input internally, so there is no need to use a
+   * <code>BufferedInputStream</code>.
+   * </p>
+   * The buffer size is given by {@link #DEFAULT_BUFFER_SIZE}.
+   *
+   * @param output the <code>OutputStream</code> to write to
+   * @param inputOffset : number of bytes to skip from input before copying
+   * -ve values are ignored
+   * @param length : number of bytes to copy. -ve means all
+   * @return the number of bytes copied
+   * @throws NullPointerException if the input or output is null
+   * @throws IOException          if an I/O error occurs
+   */
+  public long copyLarge(final OutputStream output, final long inputOffset,
+      final long length) throws IOException {
+    return inputStream.copyLarge(output, inputOffset, length,
+        new byte[DEFAULT_BUFFER_SIZE]);
+  }
 }
diff --git a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestMultipartUploadWithCopy.java b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestMultipartUploadWithCopy.java
index f688ff9..36fa70b 100644
--- a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestMultipartUploadWithCopy.java
+++ b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestMultipartUploadWithCopy.java
@@ -111,13 +111,6 @@ public class TestMultipartUploadWithCopy {
             OzoneConsts.S3_BUCKET + "/" + EXISTING_KEY, null);
     partsList.add(part2);
 
-    partNumber = 3;
-    Part part3 =
-        uploadPartWithCopy(KEY, uploadID, partNumber,
-            OzoneConsts.S3_BUCKET + "/" + EXISTING_KEY,
-            "bytes=" + RANGE_FROM + "-" + RANGE_TO);
-    partsList.add(part3);
-
     // complete multipart upload
     CompleteMultipartUploadRequest completeMultipartUploadRequest = new
         CompleteMultipartUploadRequest();
@@ -130,8 +123,7 @@ public class TestMultipartUploadWithCopy {
         OzoneConsts.S3_BUCKET);
     try (InputStream is = bucket.readKey(KEY)) {
       String keyContent = new Scanner(is).useDelimiter("\\A").next();
-      Assert.assertEquals(content + EXISTING_KEY_CONTENT + EXISTING_KEY_CONTENT
-          .substring(RANGE_FROM, RANGE_TO), keyContent);
+      Assert.assertEquals(content + EXISTING_KEY_CONTENT, keyContent);
     }
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: ozone-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: ozone-commits-help@hadoop.apache.org