You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by bh...@apache.org on 2020/04/29 05:57:11 UTC
[hadoop-ozone] branch master updated: HDDS-3223. Improve s3g read
1GB object efficiency by 100 times (#843)
This is an automated email from the ASF dual-hosted git repository.
bharat pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hadoop-ozone.git
The following commit(s) were added to refs/heads/master by this push:
new 5fbb045 HDDS-3223. Improve s3g read 1GB object efficiency by 100 times (#843)
5fbb045 is described below
commit 5fbb045dc991776bffd98f7cf5804d7ed14c8b53
Author: runzhiwang <51...@users.noreply.github.com>
AuthorDate: Wed Apr 29 13:56:58 2020 +0800
HDDS-3223. Improve s3g read 1GB object efficiency by 100 times (#843)
---
.../hadoop/hdds/scm/XceiverClientManager.java | 9 ++++
.../hadoop/hdds/scm/XceiverClientMetrics.java | 9 ++++
.../hadoop/ozone/client/io/KeyInputStream.java | 61 +++++++++++++++++++++
.../ozone/client/rpc/TestKeyInputStream.java | 62 ++++++++++++++++++++++
.../hadoop/ozone/s3/endpoint/ObjectEndpoint.java | 15 ++++--
.../hadoop/ozone/s3/io/S3WrapperInputStream.java | 31 +++++++++++
.../s3/endpoint/TestMultipartUploadWithCopy.java | 10 +---
7 files changed, 183 insertions(+), 14 deletions(-)
diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientManager.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientManager.java
index 0cfaca7..2eb2f9e 100644
--- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientManager.java
+++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientManager.java
@@ -294,6 +294,15 @@ public class XceiverClientManager implements Closeable {
}
/**
+ * Reset xceiver client metric.
+ */
+ public static synchronized void resetXceiverClientMetrics() {
+ if (metrics != null) {
+ metrics.reset();
+ }
+ }
+
+ /**
* Configuration for HDDS client.
*/
@ConfigGroup(prefix = "scm.container.client")
diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientMetrics.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientMetrics.java
index 7ca89ec..5307646 100644
--- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientMetrics.java
+++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientMetrics.java
@@ -45,6 +45,10 @@ public class XceiverClientMetrics {
private MetricsRegistry registry;
public XceiverClientMetrics() {
+ init();
+ }
+
+ public void init() {
int numEnumEntries = ContainerProtos.Type.values().length;
this.registry = new MetricsRegistry(SOURCE_NAME);
@@ -106,6 +110,11 @@ public class XceiverClientMetrics {
return opsArray[type.ordinal()].value();
}
+ @VisibleForTesting
+ public void reset() {
+ init();
+ }
+
public void unRegister() {
MetricsSystem ms = DefaultMetricsSystem.instance();
ms.unregisterSource(SOURCE_NAME);
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyInputStream.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyInputStream.java
index 8e375bf..4af6838 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyInputStream.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyInputStream.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.ozone.client.io;
import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.io.IOUtils;
import org.apache.hadoop.fs.FSExceptionMessages;
import org.apache.hadoop.fs.Seekable;
import org.apache.hadoop.hdds.client.BlockID;
@@ -34,6 +35,7 @@ import org.slf4j.LoggerFactory;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
+import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
@@ -322,4 +324,63 @@ public class KeyInputStream extends InputStream implements Seekable {
public long getRemainingOfIndex(int index) throws IOException {
return blockStreams.get(index).getRemaining();
}
+
+ /**
+ * Copies some or all bytes from a large (over 2GB) <code>InputStream</code>
+ * to an <code>OutputStream</code>, optionally skipping input bytes.
+ * <p>
+ * Copy the method from IOUtils of commons-io to reimplement skip by seek
+ * rather than read. The reason why IOUtils of commons-io implement skip
+ * by read can be found at
+ * <a href="https://issues.apache.org/jira/browse/IO-203">IO-203</a>.
+ * </p>
+ * <p>
+ * This method uses the provided buffer, so there is no need to use a
+ * <code>BufferedInputStream</code>.
+ * </p>
+ *
+ * @param output the <code>OutputStream</code> to write to
+ * @param inputOffset : number of bytes to skip from input before copying
+ * -ve values are ignored
+ * @param length : number of bytes to copy. -ve means all
+ * @param buffer the buffer to use for the copy
+ * @return the number of bytes copied
+ * @throws NullPointerException if the input or output is null
+ * @throws IOException if an I/O error occurs
+ */
+ public long copyLarge(final OutputStream output,
+ final long inputOffset, final long len, final byte[] buffer)
+ throws IOException {
+ if (inputOffset > 0) {
+ seek(inputOffset);
+ }
+
+ if (len == 0) {
+ return 0;
+ }
+
+ final int bufferLength = buffer.length;
+ int bytesToRead = bufferLength;
+ if (len > 0 && len < bufferLength) {
+ bytesToRead = (int) len;
+ }
+
+ int read;
+ long totalRead = 0;
+ while (bytesToRead > 0) {
+ read = read(buffer, 0, bytesToRead);
+ if (read == IOUtils.EOF) {
+ break;
+ }
+
+ output.write(buffer, 0, read);
+ totalRead += read;
+ if (len > 0) { // only adjust len if not reading to the end
+ // Note the cast must work because buffer.length is an integer
+ bytesToRead = (int) Math.min(len - totalRead, bufferLength);
+ }
+ }
+
+ return totalRead;
+ }
}
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestKeyInputStream.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestKeyInputStream.java
index 2779e7f..75b6055 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestKeyInputStream.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestKeyInputStream.java
@@ -37,6 +37,7 @@ import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
+import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.Random;
import java.util.UUID;
@@ -223,6 +224,7 @@ public class TestKeyInputStream {
@Test
public void testSeek() throws Exception {
+ XceiverClientManager.resetXceiverClientMetrics();
XceiverClientMetrics metrics = XceiverClientManager
.getXceiverClientMetrics();
long writeChunkCount = metrics.getContainerOpCountMetrics(
@@ -273,4 +275,64 @@ public class TestKeyInputStream {
Assert.assertEquals(inputData[chunkSize + 50 + i], readData[i]);
}
}
+
+ @Test
+ public void testCopyLarge() throws Exception {
+ String keyName = getKeyName();
+ OzoneOutputStream key = TestHelper.createKey(keyName,
+ ReplicationType.RATIS, 0, objectStore, volumeName, bucketName);
+
+ // write data spanning 3 blocks
+ int dataLength = (2 * blockSize) + (blockSize / 2);
+
+ byte[] inputData = new byte[dataLength];
+ Random rand = new Random();
+ for (int i = 0; i < dataLength; i++) {
+ inputData[i] = (byte) rand.nextInt(127);
+ }
+ key.write(inputData);
+ key.close();
+
+ // test with random start and random length
+ for (int i = 0; i < 100; i++) {
+ int inputOffset = rand.nextInt(dataLength - 1);
+ int length = rand.nextInt(dataLength - inputOffset);
+
+ KeyInputStream keyInputStream = (KeyInputStream) objectStore
+ .getVolume(volumeName).getBucket(bucketName).readKey(keyName)
+ .getInputStream();
+ ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+
+ keyInputStream.copyLarge(outputStream, inputOffset, length,
+ new byte[4096]);
+ byte[] readData = outputStream.toByteArray();
+ keyInputStream.close();
+ outputStream.close();
+
+ for (int j = inputOffset; j < inputOffset + length; j++) {
+ Assert.assertEquals(readData[j - inputOffset], inputData[j]);
+ }
+ }
+
+ // test with random start and -ve length
+ for (int i = 0; i < 10; i++) {
+ int inputOffset = rand.nextInt(dataLength - 1);
+ int length = -1;
+
+ KeyInputStream keyInputStream = (KeyInputStream) objectStore
+ .getVolume(volumeName).getBucket(bucketName).readKey(keyName)
+ .getInputStream();
+ ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+
+ keyInputStream.copyLarge(outputStream, inputOffset, length,
+ new byte[4096]);
+ byte[] readData = outputStream.toByteArray();
+ keyInputStream.close();
+ outputStream.close();
+
+ for (int j = inputOffset; j < dataLength; j++) {
+ Assert.assertEquals(readData[j - inputOffset], inputData[j]);
+ }
+ }
+ }
}
diff --git a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java
index 455e734..f695fcb 100644
--- a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java
+++ b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java
@@ -259,8 +259,7 @@ public class ObjectEndpoint extends EndpointBase {
try (S3WrapperInputStream s3WrapperInputStream =
new S3WrapperInputStream(
key.getInputStream())) {
- IOUtils.copyLarge(s3WrapperInputStream, dest, startOffset,
- copyLength);
+ s3WrapperInputStream.copyLarge(dest, startOffset, copyLength);
}
};
responseBuilder = Response
@@ -534,10 +533,16 @@ public class ObjectEndpoint extends EndpointBase {
if (range != null) {
RangeHeader rangeHeader =
RangeHeaderParserUtil.parseRangeHeader(range, 0);
- IOUtils.copyLarge(sourceObject, ozoneOutputStream,
- rangeHeader.getStartOffset(),
- rangeHeader.getEndOffset() - rangeHeader.getStartOffset());
+ long copyLength = rangeHeader.getEndOffset() -
+ rangeHeader.getStartOffset();
+
+ try (S3WrapperInputStream s3WrapperInputStream =
+ new S3WrapperInputStream(
+ sourceObject.getInputStream())) {
+ s3WrapperInputStream.copyLarge(ozoneOutputStream,
+ rangeHeader.getStartOffset(), copyLength);
+ }
} else {
IOUtils.copy(sourceObject, ozoneOutputStream);
}
diff --git a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/io/S3WrapperInputStream.java b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/io/S3WrapperInputStream.java
index 9efcc87..edf90ed 100644
--- a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/io/S3WrapperInputStream.java
+++ b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/io/S3WrapperInputStream.java
@@ -23,12 +23,14 @@ import org.apache.hadoop.ozone.client.io.KeyInputStream;
import java.io.IOException;
import java.io.InputStream;
+import java.io.OutputStream;
/**
* S3Wrapper Input Stream which encapsulates KeyInputStream from ozone.
*/
public class S3WrapperInputStream extends FSInputStream {
private final KeyInputStream inputStream;
+ private static final int DEFAULT_BUFFER_SIZE = 32 * 1024;
/**
* Constructs S3WrapperInputStream with KeyInputStream.
@@ -76,4 +78,33 @@ public class S3WrapperInputStream extends FSInputStream {
public boolean seekToNewSource(long targetPos) throws IOException {
return false;
}
+
+ /**
+ * Copies some or all bytes from a large (over 2GB) <code>InputStream</code>
+ * to an <code>OutputStream</code>, optionally skipping input bytes.
+ * <p>
+ * Copy the method from IOUtils of commons-io to reimplement skip by seek
+ * rather than read. The reason why IOUtils of commons-io implement skip
+ * by read can be found at
+ * <a href="https://issues.apache.org/jira/browse/IO-203">IO-203</a>.
+ * </p>
+ * <p>
+ * This method buffers the input internally, so there is no need to use a
+ * <code>BufferedInputStream</code>.
+ * </p>
+ * The buffer size is given by {@link #DEFAULT_BUFFER_SIZE}.
+ *
+ * @param output the <code>OutputStream</code> to write to
+ * @param inputOffset : number of bytes to skip from input before copying
+ * -ve values are ignored
+ * @param length : number of bytes to copy. -ve means all
+ * @return the number of bytes copied
+ * @throws NullPointerException if the input or output is null
+ * @throws IOException if an I/O error occurs
+ */
+ public long copyLarge(final OutputStream output, final long inputOffset,
+ final long length) throws IOException {
+ return inputStream.copyLarge(output, inputOffset, length,
+ new byte[DEFAULT_BUFFER_SIZE]);
+ }
}
diff --git a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestMultipartUploadWithCopy.java b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestMultipartUploadWithCopy.java
index f688ff9..36fa70b 100644
--- a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestMultipartUploadWithCopy.java
+++ b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestMultipartUploadWithCopy.java
@@ -111,13 +111,6 @@ public class TestMultipartUploadWithCopy {
OzoneConsts.S3_BUCKET + "/" + EXISTING_KEY, null);
partsList.add(part2);
- partNumber = 3;
- Part part3 =
- uploadPartWithCopy(KEY, uploadID, partNumber,
- OzoneConsts.S3_BUCKET + "/" + EXISTING_KEY,
- "bytes=" + RANGE_FROM + "-" + RANGE_TO);
- partsList.add(part3);
-
// complete multipart upload
CompleteMultipartUploadRequest completeMultipartUploadRequest = new
CompleteMultipartUploadRequest();
@@ -130,8 +123,7 @@ public class TestMultipartUploadWithCopy {
OzoneConsts.S3_BUCKET);
try (InputStream is = bucket.readKey(KEY)) {
String keyContent = new Scanner(is).useDelimiter("\\A").next();
- Assert.assertEquals(content + EXISTING_KEY_CONTENT + EXISTING_KEY_CONTENT
- .substring(RANGE_FROM, RANGE_TO), keyContent);
+ Assert.assertEquals(content + EXISTING_KEY_CONTENT, keyContent);
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: ozone-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: ozone-commits-help@hadoop.apache.org