You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by ro...@apache.org on 2023/02/07 08:47:49 UTC
[druid] branch master updated: Robust handling and management of S3 streams for MSQ shuffle storage (#13741)
This is an automated email from the ASF dual-hosted git repository.
rohangarg pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new a0f8889f23 Robust handling and management of S3 streams for MSQ shuffle storage (#13741)
a0f8889f23 is described below
commit a0f8889f2316b57592e2e43c28aacfa2b92cd1ab
Author: Rohan Garg <77...@users.noreply.github.com>
AuthorDate: Tue Feb 7 14:17:37 2023 +0530
Robust handling and management of S3 streams for MSQ shuffle storage (#13741)
---
.../storage/s3/output/S3StorageConnector.java | 158 +++++++++++++++++----
.../storage/s3/output/S3StorageConnectorTest.java | 13 +-
2 files changed, 142 insertions(+), 29 deletions(-)
diff --git a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/output/S3StorageConnector.java b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/output/S3StorageConnector.java
index 184e1bdfe1..ea583290af 100644
--- a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/output/S3StorageConnector.java
+++ b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/output/S3StorageConnector.java
@@ -25,19 +25,33 @@ import com.amazonaws.services.s3.model.ListObjectsV2Request;
import com.amazonaws.services.s3.model.ListObjectsV2Result;
import com.amazonaws.services.s3.model.S3ObjectSummary;
import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Predicates;
import org.apache.druid.data.input.impl.RetryingInputStream;
import org.apache.druid.data.input.impl.prefetch.ObjectOpenFunction;
+import org.apache.druid.java.util.common.FileUtils;
import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.java.util.common.IOE;
+import org.apache.druid.java.util.common.RE;
+import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.storage.StorageConnector;
import org.apache.druid.storage.s3.S3Utils;
import org.apache.druid.storage.s3.ServerSideEncryptingAmazonS3;
import javax.annotation.Nonnull;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
+import java.io.SequenceInputStream;
import java.util.ArrayList;
+import java.util.Enumeration;
import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
public class S3StorageConnector implements StorageConnector
@@ -48,11 +62,23 @@ public class S3StorageConnector implements StorageConnector
private static final String DELIM = "/";
private static final Joiner JOINER = Joiner.on(DELIM).skipNulls();
+ private static final long DOWNLOAD_MAX_CHUNK_SIZE = 100_000_000;
public S3StorageConnector(S3OutputConfig config, ServerSideEncryptingAmazonS3 serverSideEncryptingAmazonS3)
{
this.config = config;
this.s3Client = serverSideEncryptingAmazonS3;
+ Preconditions.checkNotNull(config, "config is null");
+ Preconditions.checkNotNull(config.getTempDir(), "tempDir is null in s3 config");
+ try {
+ FileUtils.mkdirp(config.getTempDir());
+ }
+ catch (IOException e) {
+ throw new RE(
+ e,
+ StringUtils.format("Cannot create tempDir : [%s] for s3 storage connector", config.getTempDir())
+ );
+ }
}
@Override
@@ -62,13 +88,13 @@ public class S3StorageConnector implements StorageConnector
}
@Override
- public InputStream read(String path) throws IOException
+ public InputStream read(String path)
{
- return buildInputStream(new GetObjectRequest(config.getBucket(), objectPath(path)));
+ return buildInputStream(new GetObjectRequest(config.getBucket(), objectPath(path)), path);
}
@Override
- public InputStream readRange(String path, long from, long size) throws IOException
+ public InputStream readRange(String path, long from, long size)
{
if (from < 0 || size < 0) {
throw new IAE(
@@ -78,35 +104,115 @@ public class S3StorageConnector implements StorageConnector
size
);
}
- return buildInputStream(new GetObjectRequest(config.getBucket(), objectPath(path)).withRange(from, from + size - 1));
+ return buildInputStream(
+ new GetObjectRequest(config.getBucket(), objectPath(path)).withRange(from, from + size - 1),
+ path
+ );
}
- private RetryingInputStream buildInputStream(GetObjectRequest getObjectRequest) throws IOException
+ private InputStream buildInputStream(GetObjectRequest getObjectRequest, String path)
{
- return new RetryingInputStream<>(
- getObjectRequest,
- new ObjectOpenFunction<GetObjectRequest>()
- {
- @Override
- public InputStream open(GetObjectRequest object)
- {
- return s3Client.getObject(object).getObjectContent();
- }
+ // fetch the size of the whole object to make chunks
+ long readEnd;
+ AtomicLong currReadStart = new AtomicLong(0);
+ if (getObjectRequest.getRange() != null) {
+ currReadStart.set(getObjectRequest.getRange()[0]);
+ readEnd = getObjectRequest.getRange()[1] + 1;
+ } else {
+ readEnd = this.s3Client.getObjectMetadata(config.getBucket(), objectPath(path)).getInstanceLength();
+ }
- @Override
- public InputStream open(GetObjectRequest object, long offset)
- {
- final GetObjectRequest offsetObjectRequest = new GetObjectRequest(
- object.getBucketName(),
- object.getKey()
+ // build a sequence input stream from chunks
+ return new SequenceInputStream(new Enumeration<InputStream>()
+ {
+ @Override
+ public boolean hasMoreElements()
+ {
+ // don't stop until the whole object is downloaded
+ return currReadStart.get() < readEnd;
+ }
+
+ @Override
+ public InputStream nextElement()
+ {
+ File outFile = new File(config.getTempDir().getAbsolutePath(), UUID.randomUUID().toString());
+ // in a single chunk, only download a maximum of DOWNLOAD_MAX_CHUNK_SIZE
+ long endPoint = Math.min(currReadStart.get() + DOWNLOAD_MAX_CHUNK_SIZE, readEnd) - 1;
+ try {
+ if (!outFile.createNewFile()) {
+ throw new IOE(
+ StringUtils.format(
+ "Could not create temporary file [%s] for copying [%s]",
+ outFile.getAbsolutePath(),
+ objectPath(path)
+ )
);
- offsetObjectRequest.setRange(offset);
- return open(offsetObjectRequest);
}
- },
- S3Utils.S3RETRY,
- config.getMaxRetry()
- );
+ FileUtils.copyLarge(
+ () -> new RetryingInputStream<>(
+ new GetObjectRequest(
+ config.getBucket(),
+ objectPath(path)
+ ).withRange(currReadStart.get(), endPoint),
+ new ObjectOpenFunction<GetObjectRequest>()
+ {
+ @Override
+ public InputStream open(GetObjectRequest object)
+ {
+ return s3Client.getObject(object).getObjectContent();
+ }
+
+ @Override
+ public InputStream open(GetObjectRequest object, long offset)
+ {
+ if (object.getRange() != null) {
+ long[] oldRange = object.getRange();
+ object.setRange(oldRange[0] + offset, oldRange[1]);
+ } else {
+ object.setRange(offset);
+ }
+ return open(object);
+ }
+ },
+ S3Utils.S3RETRY,
+ 3
+ ),
+ outFile,
+ new byte[8 * 1024],
+ Predicates.alwaysFalse(),
+ 1,
+ StringUtils.format("Retrying copying of [%s] to [%s]", objectPath(path), outFile.getAbsolutePath())
+ );
+ }
+ catch (IOException e) {
+ throw new RE(e, StringUtils.format("Unable to copy [%s] to [%s]", objectPath(path), outFile));
+ }
+ try {
+ AtomicBoolean isClosed = new AtomicBoolean(false);
+ return new FileInputStream(outFile)
+ {
+ @Override
+ public void close() throws IOException
+ {
+ // close should be idempotent
+ if (isClosed.get()) {
+ return;
+ }
+ isClosed.set(true);
+ super.close();
+ // since endPoint is inclusive in s3's get request API, the next currReadStart is endpoint + 1
+ currReadStart.set(endPoint + 1);
+ if (!outFile.delete()) {
+ throw new RE("Cannot delete temp file [%s]", outFile);
+ }
+ }
+ };
+ }
+ catch (FileNotFoundException e) {
+ throw new RE(e, StringUtils.format("Unable to find temp file [%s]", outFile));
+ }
+ }
+ });
}
@Override
diff --git a/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/output/S3StorageConnectorTest.java b/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/output/S3StorageConnectorTest.java
index 154918cc2d..0a02dce4d2 100644
--- a/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/output/S3StorageConnectorTest.java
+++ b/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/output/S3StorageConnectorTest.java
@@ -24,6 +24,7 @@ import com.amazonaws.services.s3.model.DeleteObjectsRequest;
import com.amazonaws.services.s3.model.GetObjectRequest;
import com.amazonaws.services.s3.model.ListObjectsV2Request;
import com.amazonaws.services.s3.model.ListObjectsV2Result;
+import com.amazonaws.services.s3.model.ObjectMetadata;
import com.amazonaws.services.s3.model.S3Object;
import com.amazonaws.services.s3.model.S3ObjectSummary;
import com.google.common.collect.ImmutableList;
@@ -104,9 +105,15 @@ public class S3StorageConnectorTest
public void pathRead() throws IOException
{
EasyMock.reset(S3_CLIENT);
+ ObjectMetadata objectMetadata = new ObjectMetadata();
+ long contentLength = "test".getBytes(StandardCharsets.UTF_8).length;
+ objectMetadata.setContentLength(contentLength);
S3Object s3Object = new S3Object();
s3Object.setObjectContent(new ByteArrayInputStream("test".getBytes(StandardCharsets.UTF_8)));
- EasyMock.expect(S3_CLIENT.getObject(new GetObjectRequest(BUCKET, PREFIX + "/" + TEST_FILE))).andReturn(s3Object);
+ EasyMock.expect(S3_CLIENT.getObjectMetadata(EasyMock.anyObject())).andReturn(objectMetadata);
+ EasyMock.expect(S3_CLIENT.getObject(
+ new GetObjectRequest(BUCKET, PREFIX + "/" + TEST_FILE).withRange(0, contentLength - 1))
+ ).andReturn(s3Object);
EasyMock.replay(S3_CLIENT);
String readText = new BufferedReader(
@@ -141,8 +148,8 @@ public class S3StorageConnectorTest
InputStream is = storageConnector.readRange(TEST_FILE, start, length);
byte[] dataBytes = new byte[length];
- Assert.assertEquals(is.read(dataBytes), length);
- Assert.assertEquals(is.read(), -1); // reading further produces no data
+ Assert.assertEquals(length, is.read(dataBytes));
+ Assert.assertEquals(-1, is.read()); // reading further produces no data
Assert.assertEquals(dataQueried, new String(dataBytes, StandardCharsets.UTF_8));
EasyMock.reset(S3_CLIENT);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org