You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by ro...@apache.org on 2023/02/07 08:47:49 UTC

[druid] branch master updated: Robust handling and management of S3 streams for MSQ shuffle storage (#13741)

This is an automated email from the ASF dual-hosted git repository.

rohangarg pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new a0f8889f23 Robust handling and management of S3 streams for MSQ shuffle storage (#13741)
a0f8889f23 is described below

commit a0f8889f2316b57592e2e43c28aacfa2b92cd1ab
Author: Rohan Garg <77...@users.noreply.github.com>
AuthorDate: Tue Feb 7 14:17:37 2023 +0530

    Robust handling and management of S3 streams for MSQ shuffle storage (#13741)
---
 .../storage/s3/output/S3StorageConnector.java      | 158 +++++++++++++++++----
 .../storage/s3/output/S3StorageConnectorTest.java  |  13 +-
 2 files changed, 142 insertions(+), 29 deletions(-)

diff --git a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/output/S3StorageConnector.java b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/output/S3StorageConnector.java
index 184e1bdfe1..ea583290af 100644
--- a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/output/S3StorageConnector.java
+++ b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/output/S3StorageConnector.java
@@ -25,19 +25,33 @@ import com.amazonaws.services.s3.model.ListObjectsV2Request;
 import com.amazonaws.services.s3.model.ListObjectsV2Result;
 import com.amazonaws.services.s3.model.S3ObjectSummary;
 import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Predicates;
 import org.apache.druid.data.input.impl.RetryingInputStream;
 import org.apache.druid.data.input.impl.prefetch.ObjectOpenFunction;
+import org.apache.druid.java.util.common.FileUtils;
 import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.java.util.common.IOE;
+import org.apache.druid.java.util.common.RE;
+import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.storage.StorageConnector;
 import org.apache.druid.storage.s3.S3Utils;
 import org.apache.druid.storage.s3.ServerSideEncryptingAmazonS3;
 
 import javax.annotation.Nonnull;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
+import java.io.SequenceInputStream;
 import java.util.ArrayList;
+import java.util.Enumeration;
 import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.stream.Collectors;
 
 public class S3StorageConnector implements StorageConnector
@@ -48,11 +62,23 @@ public class S3StorageConnector implements StorageConnector
 
   private static final String DELIM = "/";
   private static final Joiner JOINER = Joiner.on(DELIM).skipNulls();
+  private static final long DOWNLOAD_MAX_CHUNK_SIZE = 100_000_000;
 
   public S3StorageConnector(S3OutputConfig config, ServerSideEncryptingAmazonS3 serverSideEncryptingAmazonS3)
   {
     this.config = config;
     this.s3Client = serverSideEncryptingAmazonS3;
+    Preconditions.checkNotNull(config, "config is null");
+    Preconditions.checkNotNull(config.getTempDir(), "tempDir is null in s3 config");
+    try {
+      FileUtils.mkdirp(config.getTempDir());
+    }
+    catch (IOException e) {
+      throw new RE(
+          e,
+          StringUtils.format("Cannot create tempDir : [%s] for s3 storage connector", config.getTempDir())
+      );
+    }
   }
 
   @Override
@@ -62,13 +88,13 @@ public class S3StorageConnector implements StorageConnector
   }
 
   @Override
-  public InputStream read(String path) throws IOException
+  public InputStream read(String path)
   {
-    return buildInputStream(new GetObjectRequest(config.getBucket(), objectPath(path)));
+    return buildInputStream(new GetObjectRequest(config.getBucket(), objectPath(path)), path);
   }
 
   @Override
-  public InputStream readRange(String path, long from, long size) throws IOException
+  public InputStream readRange(String path, long from, long size)
   {
     if (from < 0 || size < 0) {
       throw new IAE(
@@ -78,35 +104,115 @@ public class S3StorageConnector implements StorageConnector
           size
       );
     }
-    return buildInputStream(new GetObjectRequest(config.getBucket(), objectPath(path)).withRange(from, from + size - 1));
+    return buildInputStream(
+        new GetObjectRequest(config.getBucket(), objectPath(path)).withRange(from, from + size - 1),
+        path
+    );
   }
 
-  private RetryingInputStream buildInputStream(GetObjectRequest getObjectRequest) throws IOException
+  private InputStream buildInputStream(GetObjectRequest getObjectRequest, String path)
   {
-    return new RetryingInputStream<>(
-        getObjectRequest,
-        new ObjectOpenFunction<GetObjectRequest>()
-        {
-          @Override
-          public InputStream open(GetObjectRequest object)
-          {
-            return s3Client.getObject(object).getObjectContent();
-          }
+    // fetch the size of the whole object to make chunks
+    long readEnd;
+    AtomicLong currReadStart = new AtomicLong(0);
+    if (getObjectRequest.getRange() != null) {
+      currReadStart.set(getObjectRequest.getRange()[0]);
+      readEnd = getObjectRequest.getRange()[1] + 1;
+    } else {
+      readEnd = this.s3Client.getObjectMetadata(config.getBucket(), objectPath(path)).getInstanceLength();
+    }
 
-          @Override
-          public InputStream open(GetObjectRequest object, long offset)
-          {
-            final GetObjectRequest offsetObjectRequest = new GetObjectRequest(
-                object.getBucketName(),
-                object.getKey()
+    // build a sequence input stream from chunks
+    return new SequenceInputStream(new Enumeration<InputStream>()
+    {
+      @Override
+      public boolean hasMoreElements()
+      {
+        // don't stop until the whole object is downloaded
+        return currReadStart.get() < readEnd;
+      }
+
+      @Override
+      public InputStream nextElement()
+      {
+        File outFile = new File(config.getTempDir().getAbsolutePath(), UUID.randomUUID().toString());
+        // in a single chunk, only download a maximum of DOWNLOAD_MAX_CHUNK_SIZE
+        long endPoint = Math.min(currReadStart.get() + DOWNLOAD_MAX_CHUNK_SIZE, readEnd) - 1;
+        try {
+          if (!outFile.createNewFile()) {
+            throw new IOE(
+                StringUtils.format(
+                    "Could not create temporary file [%s] for copying [%s]",
+                    outFile.getAbsolutePath(),
+                    objectPath(path)
+                )
             );
-            offsetObjectRequest.setRange(offset);
-            return open(offsetObjectRequest);
           }
-        },
-        S3Utils.S3RETRY,
-        config.getMaxRetry()
-    );
+          FileUtils.copyLarge(
+              () -> new RetryingInputStream<>(
+                  new GetObjectRequest(
+                      config.getBucket(),
+                      objectPath(path)
+                  ).withRange(currReadStart.get(), endPoint),
+                  new ObjectOpenFunction<GetObjectRequest>()
+                  {
+                    @Override
+                    public InputStream open(GetObjectRequest object)
+                    {
+                      return s3Client.getObject(object).getObjectContent();
+                    }
+
+                    @Override
+                    public InputStream open(GetObjectRequest object, long offset)
+                    {
+                      if (object.getRange() != null) {
+                        long[] oldRange = object.getRange();
+                        object.setRange(oldRange[0] + offset, oldRange[1]);
+                      } else {
+                        object.setRange(offset);
+                      }
+                      return open(object);
+                    }
+                  },
+                  S3Utils.S3RETRY,
+                  3
+              ),
+              outFile,
+              new byte[8 * 1024],
+              Predicates.alwaysFalse(),
+              1,
+              StringUtils.format("Retrying copying of [%s] to [%s]", objectPath(path), outFile.getAbsolutePath())
+          );
+        }
+        catch (IOException e) {
+          throw new RE(e, StringUtils.format("Unable to copy [%s] to [%s]", objectPath(path), outFile));
+        }
+        try {
+          AtomicBoolean isClosed = new AtomicBoolean(false);
+          return new FileInputStream(outFile)
+          {
+            @Override
+            public void close() throws IOException
+            {
+              // close should be idempotent
+              if (isClosed.get()) {
+                return;
+              }
+              isClosed.set(true);
+              super.close();
+              // since endPoint is inclusive in s3's get request API, the next currReadStart is endpoint + 1
+              currReadStart.set(endPoint + 1);
+              if (!outFile.delete()) {
+                throw new RE("Cannot delete temp file [%s]", outFile);
+              }
+            }
+          };
+        }
+        catch (FileNotFoundException e) {
+          throw new RE(e, StringUtils.format("Unable to find temp file [%s]", outFile));
+        }
+      }
+    });
   }
 
   @Override
diff --git a/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/output/S3StorageConnectorTest.java b/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/output/S3StorageConnectorTest.java
index 154918cc2d..0a02dce4d2 100644
--- a/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/output/S3StorageConnectorTest.java
+++ b/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/output/S3StorageConnectorTest.java
@@ -24,6 +24,7 @@ import com.amazonaws.services.s3.model.DeleteObjectsRequest;
 import com.amazonaws.services.s3.model.GetObjectRequest;
 import com.amazonaws.services.s3.model.ListObjectsV2Request;
 import com.amazonaws.services.s3.model.ListObjectsV2Result;
+import com.amazonaws.services.s3.model.ObjectMetadata;
 import com.amazonaws.services.s3.model.S3Object;
 import com.amazonaws.services.s3.model.S3ObjectSummary;
 import com.google.common.collect.ImmutableList;
@@ -104,9 +105,15 @@ public class S3StorageConnectorTest
   public void pathRead() throws IOException
   {
     EasyMock.reset(S3_CLIENT);
+    ObjectMetadata objectMetadata = new ObjectMetadata();
+    long contentLength = "test".getBytes(StandardCharsets.UTF_8).length;
+    objectMetadata.setContentLength(contentLength);
     S3Object s3Object = new S3Object();
     s3Object.setObjectContent(new ByteArrayInputStream("test".getBytes(StandardCharsets.UTF_8)));
-    EasyMock.expect(S3_CLIENT.getObject(new GetObjectRequest(BUCKET, PREFIX + "/" + TEST_FILE))).andReturn(s3Object);
+    EasyMock.expect(S3_CLIENT.getObjectMetadata(EasyMock.anyObject())).andReturn(objectMetadata);
+    EasyMock.expect(S3_CLIENT.getObject(
+        new GetObjectRequest(BUCKET, PREFIX + "/" + TEST_FILE).withRange(0, contentLength - 1))
+    ).andReturn(s3Object);
     EasyMock.replay(S3_CLIENT);
 
     String readText = new BufferedReader(
@@ -141,8 +148,8 @@ public class S3StorageConnectorTest
 
         InputStream is = storageConnector.readRange(TEST_FILE, start, length);
         byte[] dataBytes = new byte[length];
-        Assert.assertEquals(is.read(dataBytes), length);
-        Assert.assertEquals(is.read(), -1); // reading further produces no data
+        Assert.assertEquals(length, is.read(dataBytes));
+        Assert.assertEquals(-1, is.read()); // reading further produces no data
         Assert.assertEquals(dataQueried, new String(dataBytes, StandardCharsets.UTF_8));
         EasyMock.reset(S3_CLIENT);
       }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org