You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by "imply-cheddar (via GitHub)" <gi...@apache.org> on 2023/02/03 06:23:08 UTC

[GitHub] [druid] imply-cheddar commented on a diff in pull request #13741: Robust handling and management of S3 streams for MSQ shuffle storage

imply-cheddar commented on code in PR #13741:
URL: https://github.com/apache/druid/pull/13741#discussion_r1095389740


##########
extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/output/S3StorageConnector.java:
##########
@@ -48,11 +61,20 @@ public class S3StorageConnector implements StorageConnector
 
   private static final String DELIM = "/";
   private static final Joiner JOINER = Joiner.on(DELIM).skipNulls();
+  private static final long DOWNLOAD_MAX_CHUNK_SIZE = 100_000_000;
 
   public S3StorageConnector(S3OutputConfig config, ServerSideEncryptingAmazonS3 serverSideEncryptingAmazonS3)
   {
     this.config = config;
     this.s3Client = serverSideEncryptingAmazonS3;
+    if (config.getTempDir() != null) {

Review Comment:
   If `getTempDir` is null, this code is still gonna fail, but a lot later on.  You can test and validate here instead.



##########
extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/output/S3StorageConnector.java:
##########
@@ -78,35 +100,117 @@ public InputStream readRange(String path, long from, long size) throws IOExcepti
           size
       );
     }
-    return buildInputStream(new GetObjectRequest(config.getBucket(), objectPath(path)).withRange(from, from + size - 1));
+    return buildInputStream(
+        new GetObjectRequest(config.getBucket(), objectPath(path)).withRange(from, from + size - 1),
+        path
+    );
   }
 
-  private RetryingInputStream buildInputStream(GetObjectRequest getObjectRequest) throws IOException
+  private InputStream buildInputStream(GetObjectRequest getObjectRequest, String path)
   {
-    return new RetryingInputStream<>(
-        getObjectRequest,
-        new ObjectOpenFunction<GetObjectRequest>()
-        {
-          @Override
-          public InputStream open(GetObjectRequest object)
-          {
-            return s3Client.getObject(object).getObjectContent();
-          }
+    // fetch the size of the whole object to make chunks
+    long readEnd;
+    AtomicLong currReadStart = new AtomicLong(0);
+    if (getObjectRequest.getRange() != null) {
+      currReadStart.set(getObjectRequest.getRange()[0]);
+      readEnd = getObjectRequest.getRange()[1] + 1;
+    } else {
+      readEnd = this.s3Client.getObjectMetadata(config.getBucket(), objectPath(path)).getInstanceLength();
+    }
 
-          @Override
-          public InputStream open(GetObjectRequest object, long offset)
-          {
-            final GetObjectRequest offsetObjectRequest = new GetObjectRequest(
-                object.getBucketName(),
-                object.getKey()
+    // build a sequence input stream from chunks
+    return new SequenceInputStream(new Enumeration<InputStream>()
+    {
+      @Override
+      public boolean hasMoreElements()
+      {
+        // don't stop until the whole object is downloaded
+        return currReadStart.get() < readEnd;
+      }
+
+      @Override
+      public InputStream nextElement()
+      {
+        File outFile = new File(config.getTempDir().getAbsolutePath(), UUID.randomUUID().toString());
+        // in a single chunk, only download a maximum of DOWNLOAD_MAX_CHUNK_SIZE
+        long endPoint = Math.min(currReadStart.get() + DOWNLOAD_MAX_CHUNK_SIZE, readEnd) - 1;
+        try {
+          if (!outFile.createNewFile()) {
+            throw new IOE(
+                StringUtils.format(
+                    "Could not create temporary file [%s] for copying [%s]",
+                    outFile.getAbsolutePath(),
+                    objectPath(path)
+                )
             );
-            offsetObjectRequest.setRange(offset);
-            return open(offsetObjectRequest);
           }
-        },
-        S3Utils.S3RETRY,
-        config.getMaxRetry()
-    );
+          FileUtils.copyLarge(
+              () -> new RetryingInputStream<>(
+                  new GetObjectRequest(
+                      config.getBucket(),
+                      objectPath(path)
+                  ).withRange(currReadStart.get(), endPoint),
+                  new ObjectOpenFunction<GetObjectRequest>()
+                  {
+                    @Override
+                    public InputStream open(GetObjectRequest object)
+                    {
+                      return s3Client.getObject(object).getObjectContent();
+                    }
+
+                    @Override
+                    public InputStream open(GetObjectRequest object, long offset)
+                    {
+                      if (object.getRange() != null) {
+                        long[] oldRange = object.getRange();
+                        object.setRange(oldRange[0] + offset, oldRange[1]);
+                      } else {
+                        object.setRange(offset);
+                      }
+                      return open(object);
+                    }
+                  },
+                  S3Utils.S3RETRY,
+                  config.getMaxRetry()
+              ),
+              outFile,
+              new byte[8 * 1024],
+              Predicates.alwaysFalse(),
+              1,
+              StringUtils.format("Retrying copying of [%s] to [%s]", objectPath(path), outFile.getAbsolutePath())
+          );
+        }
+        catch (IOException e) {
+          throw new UncheckedIOException(e);

Review Comment:
   What's an UncheckedIOException other than just a RuntimeException?



##########
extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/output/S3StorageConnectorTest.java:
##########
@@ -104,9 +105,15 @@ public void pathExists() throws IOException
   public void pathRead() throws IOException
   {
     EasyMock.reset(S3_CLIENT);
+    ObjectMetadata objectMetadata = new ObjectMetadata();
+    long contentLength = "test".getBytes(StandardCharsets.UTF_8).length;
+    objectMetadata.setContentLength(contentLength);
     S3Object s3Object = new S3Object();
     s3Object.setObjectContent(new ByteArrayInputStream("test".getBytes(StandardCharsets.UTF_8)));
-    EasyMock.expect(S3_CLIENT.getObject(new GetObjectRequest(BUCKET, PREFIX + "/" + TEST_FILE))).andReturn(s3Object);
+    EasyMock.expect(S3_CLIENT.getObjectMetadata(EasyMock.anyObject())).andReturn(objectMetadata);
+    EasyMock.expect(S3_CLIENT.getObject(
+        new GetObjectRequest(BUCKET, PREFIX + "/" + TEST_FILE).withRange(0, contentLength - 1))
+    ).andReturn(s3Object);
     EasyMock.replay(S3_CLIENT);

Review Comment:
   You don't have any testing for the retry behavior.  If retries exist with the configs, please test all of the different ways that retries can happen and whether we expect multiplicativity or not.



##########
extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/output/S3StorageConnector.java:
##########
@@ -78,35 +100,117 @@ public InputStream readRange(String path, long from, long size) throws IOExcepti
           size
       );
     }
-    return buildInputStream(new GetObjectRequest(config.getBucket(), objectPath(path)).withRange(from, from + size - 1));
+    return buildInputStream(
+        new GetObjectRequest(config.getBucket(), objectPath(path)).withRange(from, from + size - 1),
+        path
+    );
   }
 
-  private RetryingInputStream buildInputStream(GetObjectRequest getObjectRequest) throws IOException
+  private InputStream buildInputStream(GetObjectRequest getObjectRequest, String path)
   {
-    return new RetryingInputStream<>(
-        getObjectRequest,
-        new ObjectOpenFunction<GetObjectRequest>()
-        {
-          @Override
-          public InputStream open(GetObjectRequest object)
-          {
-            return s3Client.getObject(object).getObjectContent();
-          }
+    // fetch the size of the whole object to make chunks
+    long readEnd;
+    AtomicLong currReadStart = new AtomicLong(0);
+    if (getObjectRequest.getRange() != null) {
+      currReadStart.set(getObjectRequest.getRange()[0]);
+      readEnd = getObjectRequest.getRange()[1] + 1;
+    } else {
+      readEnd = this.s3Client.getObjectMetadata(config.getBucket(), objectPath(path)).getInstanceLength();
+    }
 
-          @Override
-          public InputStream open(GetObjectRequest object, long offset)
-          {
-            final GetObjectRequest offsetObjectRequest = new GetObjectRequest(
-                object.getBucketName(),
-                object.getKey()
+    // build a sequence input stream from chunks
+    return new SequenceInputStream(new Enumeration<InputStream>()
+    {
+      @Override
+      public boolean hasMoreElements()
+      {
+        // don't stop until the whole object is downloaded
+        return currReadStart.get() < readEnd;
+      }
+
+      @Override
+      public InputStream nextElement()
+      {
+        File outFile = new File(config.getTempDir().getAbsolutePath(), UUID.randomUUID().toString());
+        // in a single chunk, only download a maximum of DOWNLOAD_MAX_CHUNK_SIZE
+        long endPoint = Math.min(currReadStart.get() + DOWNLOAD_MAX_CHUNK_SIZE, readEnd) - 1;
+        try {
+          if (!outFile.createNewFile()) {
+            throw new IOE(
+                StringUtils.format(
+                    "Could not create temporary file [%s] for copying [%s]",
+                    outFile.getAbsolutePath(),
+                    objectPath(path)
+                )
             );
-            offsetObjectRequest.setRange(offset);
-            return open(offsetObjectRequest);
           }
-        },
-        S3Utils.S3RETRY,
-        config.getMaxRetry()
-    );
+          FileUtils.copyLarge(
+              () -> new RetryingInputStream<>(

Review Comment:
   Do you really need to do this inside of a retrying input stream?  We've had a rash of issues where code was trying to retry a thing, and then a layer above it was trying to retry a thing and then a layer above that leading to multiplicative growth of retries and really long delays in processing.  Given that the whole fault-tolerance stuff exists and will retry failed tasks anyway, perhaps this isn't the right layer to add yet another set of retries?  Taht or like, limit it to only 2 retries at a max if we really do want to have a retry at this layer.



##########
extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/output/S3StorageConnector.java:
##########
@@ -78,35 +100,117 @@ public InputStream readRange(String path, long from, long size) throws IOExcepti
           size
       );
     }
-    return buildInputStream(new GetObjectRequest(config.getBucket(), objectPath(path)).withRange(from, from + size - 1));
+    return buildInputStream(
+        new GetObjectRequest(config.getBucket(), objectPath(path)).withRange(from, from + size - 1),
+        path
+    );
   }
 
-  private RetryingInputStream buildInputStream(GetObjectRequest getObjectRequest) throws IOException
+  private InputStream buildInputStream(GetObjectRequest getObjectRequest, String path)
   {
-    return new RetryingInputStream<>(
-        getObjectRequest,
-        new ObjectOpenFunction<GetObjectRequest>()
-        {
-          @Override
-          public InputStream open(GetObjectRequest object)
-          {
-            return s3Client.getObject(object).getObjectContent();
-          }
+    // fetch the size of the whole object to make chunks
+    long readEnd;
+    AtomicLong currReadStart = new AtomicLong(0);
+    if (getObjectRequest.getRange() != null) {
+      currReadStart.set(getObjectRequest.getRange()[0]);
+      readEnd = getObjectRequest.getRange()[1] + 1;
+    } else {
+      readEnd = this.s3Client.getObjectMetadata(config.getBucket(), objectPath(path)).getInstanceLength();
+    }
 
-          @Override
-          public InputStream open(GetObjectRequest object, long offset)
-          {
-            final GetObjectRequest offsetObjectRequest = new GetObjectRequest(
-                object.getBucketName(),
-                object.getKey()
+    // build a sequence input stream from chunks
+    return new SequenceInputStream(new Enumeration<InputStream>()
+    {
+      @Override
+      public boolean hasMoreElements()
+      {
+        // don't stop until the whole object is downloaded
+        return currReadStart.get() < readEnd;
+      }
+
+      @Override
+      public InputStream nextElement()
+      {
+        File outFile = new File(config.getTempDir().getAbsolutePath(), UUID.randomUUID().toString());
+        // in a single chunk, only download a maximum of DOWNLOAD_MAX_CHUNK_SIZE
+        long endPoint = Math.min(currReadStart.get() + DOWNLOAD_MAX_CHUNK_SIZE, readEnd) - 1;
+        try {
+          if (!outFile.createNewFile()) {
+            throw new IOE(
+                StringUtils.format(
+                    "Could not create temporary file [%s] for copying [%s]",
+                    outFile.getAbsolutePath(),
+                    objectPath(path)
+                )
             );
-            offsetObjectRequest.setRange(offset);
-            return open(offsetObjectRequest);
           }
-        },
-        S3Utils.S3RETRY,
-        config.getMaxRetry()
-    );
+          FileUtils.copyLarge(
+              () -> new RetryingInputStream<>(
+                  new GetObjectRequest(
+                      config.getBucket(),
+                      objectPath(path)
+                  ).withRange(currReadStart.get(), endPoint),
+                  new ObjectOpenFunction<GetObjectRequest>()
+                  {
+                    @Override
+                    public InputStream open(GetObjectRequest object)
+                    {
+                      return s3Client.getObject(object).getObjectContent();
+                    }
+
+                    @Override
+                    public InputStream open(GetObjectRequest object, long offset)
+                    {
+                      if (object.getRange() != null) {
+                        long[] oldRange = object.getRange();
+                        object.setRange(oldRange[0] + offset, oldRange[1]);
+                      } else {
+                        object.setRange(offset);
+                      }
+                      return open(object);
+                    }
+                  },
+                  S3Utils.S3RETRY,
+                  config.getMaxRetry()
+              ),
+              outFile,
+              new byte[8 * 1024],
+              Predicates.alwaysFalse(),
+              1,
+              StringUtils.format("Retrying copying of [%s] to [%s]", objectPath(path), outFile.getAbsolutePath())
+          );
+        }
+        catch (IOException e) {
+          throw new UncheckedIOException(e);
+        }
+        try {
+          AtomicBoolean isClosed = new AtomicBoolean(false);
+          return new FileInputStream(outFile)
+          {
+            @Override
+            public void close() throws IOException
+            {
+              // close should be idempotent
+              if (isClosed.get()) {
+                return;
+              }

Review Comment:
   I'm very much scared of places where the contract is that close can be called multiple times.  It's indicative of a lack of ability and understanding of when the lifecycle of an object is truly over and often highlights other problems.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org