You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by ag...@apache.org on 2021/11/26 19:46:06 UTC

[druid] branch master updated: AWS "Data read has a different length than the expected" error should reset stream and try again (#11941)

This is an automated email from the ASF dual-hosted git repository.

agonzalez pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 8eff633  AWS "Data read has a different length than the expected" error should reset stream and try again (#11941)
8eff633 is described below

commit 8eff6334f7750f1be00a283456d7a78c031317d4
Author: Agustin Gonzalez <ag...@imply.io>
AuthorDate: Fri Nov 26 12:45:34 2021 -0700

    AWS "Data read has a different length than the expected" error should reset stream and try again (#11941)
    
    * Add support for custom reset condition & support for other args to have defaults to make the method api consistent
    
    * Add support for custom reset condition to InputEntity
    
    * Fix test names
    
    * Clarifying comments to why we need to read the message's content to identify S3's resettable exception
    
    * Add unit test to verify custom resettable condition for S3Entity
    
    * Provide a way to customize retries since they are expensive to test
---
 .../org/apache/druid/data/input/InputEntity.java   |  12 ++
 .../druid/data/input/RetryingInputEntity.java      |   9 +-
 .../druid/data/input/impl/RetryingInputStream.java |  32 ++++-
 .../data/input/impl/prefetch/FileFetcher.java      |   2 +-
 .../impl/prefetch/RetryingInputStreamTest.java     | 148 +++++++++++++++------
 .../org/apache/druid/data/input/s3/S3Entity.java   |  33 +++++
 .../apache/druid/data/input/s3/S3InputSource.java  |  27 +++-
 .../druid/storage/s3/ObjectSummaryIterator.java    |  31 ++++-
 .../java/org/apache/druid/storage/s3/S3Utils.java  |  28 ++++
 .../druid/data/input/s3/S3InputSourceTest.java     |  72 ++++++++++
 10 files changed, 339 insertions(+), 55 deletions(-)

diff --git a/core/src/main/java/org/apache/druid/data/input/InputEntity.java b/core/src/main/java/org/apache/druid/data/input/InputEntity.java
index a253ac6..7d5af4c 100644
--- a/core/src/main/java/org/apache/druid/data/input/InputEntity.java
+++ b/core/src/main/java/org/apache/druid/data/input/InputEntity.java
@@ -21,6 +21,7 @@ package org.apache.druid.data.input;
 
 import com.google.common.base.Predicate;
 import com.google.common.base.Predicates;
+import org.apache.druid.data.input.impl.RetryingInputStream;
 import org.apache.druid.guice.annotations.UnstableApi;
 import org.apache.druid.java.util.common.FileUtils;
 import org.apache.druid.java.util.common.StringUtils;
@@ -128,4 +129,15 @@ public interface InputEntity
   {
     return Predicates.alwaysFalse();
   }
+
+  /**
+   * Returns a reset condition that the caller should retry on.
+   * The returned condition should be used when reading data from this InputEntity such as in {@link #fetch}
+   * or {@link RetryingInputEntity}.
+   */
+  default Predicate<Throwable> getResetCondition()
+  {
+    return RetryingInputStream.DEFAULT_RESET_CONDITION;
+  }
+
 }
diff --git a/core/src/main/java/org/apache/druid/data/input/RetryingInputEntity.java b/core/src/main/java/org/apache/druid/data/input/RetryingInputEntity.java
index 3c0aa44..94dd509 100644
--- a/core/src/main/java/org/apache/druid/data/input/RetryingInputEntity.java
+++ b/core/src/main/java/org/apache/druid/data/input/RetryingInputEntity.java
@@ -40,11 +40,18 @@ public abstract class RetryingInputEntity implements InputEntity
         this,
         new RetryingInputEntityOpenFunction(),
         getRetryCondition(),
-        RetryUtils.DEFAULT_MAX_TRIES
+        getResetCondition(),
+        getMaxRetries()
     );
     return CompressionUtils.decompress(retryingInputStream, getPath());
   }
 
+  // override this in sub-classes to customize retries
+  protected int getMaxRetries()
+  {
+    return RetryUtils.DEFAULT_MAX_TRIES;
+  }
+
   /**
    * Directly opens an {@link InputStream} on the input entity. Decompression should be handled externally, and is
    * handled by the default implementation of {@link #open}, so this should return the raw stream for the object.
diff --git a/core/src/main/java/org/apache/druid/data/input/impl/RetryingInputStream.java b/core/src/main/java/org/apache/druid/data/input/impl/RetryingInputStream.java
index 13349e0..ac0a11b 100644
--- a/core/src/main/java/org/apache/druid/data/input/impl/RetryingInputStream.java
+++ b/core/src/main/java/org/apache/druid/data/input/impl/RetryingInputStream.java
@@ -20,6 +20,7 @@
 package org.apache.druid.data.input.impl;
 
 import com.google.common.base.Predicate;
+import com.google.common.base.Predicates;
 import com.google.common.base.Throwables;
 import com.google.common.io.CountingInputStream;
 import org.apache.druid.data.input.impl.prefetch.Fetcher;
@@ -27,6 +28,7 @@ import org.apache.druid.data.input.impl.prefetch.ObjectOpenFunction;
 import org.apache.druid.java.util.common.RetryUtils;
 import org.apache.druid.java.util.common.logger.Logger;
 
+import javax.annotation.Nullable;
 import java.io.IOException;
 import java.io.InputStream;
 import java.net.SocketException;
@@ -39,31 +41,49 @@ import java.net.SocketException;
  */
 public class RetryingInputStream<T> extends InputStream
 {
+
+  public static final Predicate<Throwable> DEFAULT_RETRY_CONDITION = Predicates.alwaysFalse();
+  public static final Predicate<Throwable> DEFAULT_RESET_CONDITION = RetryingInputStream::isConnectionReset;
+
   private static final Logger log = new Logger(RetryingInputStream.class);
 
   private final T object;
   private final ObjectOpenFunction<T> objectOpenFunction;
   private final Predicate<Throwable> retryCondition;
+  private final Predicate<Throwable> resetCondition;
   private final int maxRetry;
 
   private CountingInputStream delegate;
   private long startOffset;
 
+  /**
+   *
+   * @param object The object entity to open
+   * @param objectOpenFunction How to open the object
+   * @param retryCondition A predicate on a throwable to indicate if stream should retry. This defaults to
+   *                       {@link IOException}, not retryable, when null is passed
+   * @param resetCondition A predicate on a throwable to indicate if stream should reset. This defaults to
+   *                       a generic reset test, see {@link #isConnectionReset(Throwable)} when null is passed
+   * @param maxRetry      The maximum times to retry. Defaults to {@link RetryUtils#DEFAULT_MAX_TRIES} when null
+   * @throws IOException
+   */
   public RetryingInputStream(
       T object,
       ObjectOpenFunction<T> objectOpenFunction,
-      Predicate<Throwable> retryCondition,
-      int maxRetry
+      @Nullable Predicate<Throwable> retryCondition,
+      @Nullable Predicate<Throwable> resetCondition,
+      @Nullable Integer maxRetry
   ) throws IOException
   {
     this.object = object;
     this.objectOpenFunction = objectOpenFunction;
-    this.retryCondition = retryCondition;
-    this.maxRetry = maxRetry;
+    this.retryCondition = retryCondition == null ? DEFAULT_RETRY_CONDITION : retryCondition;
+    this.resetCondition = resetCondition == null ? DEFAULT_RESET_CONDITION : resetCondition;
+    this.maxRetry = maxRetry == null ? RetryUtils.DEFAULT_MAX_TRIES : maxRetry;
     this.delegate = new CountingInputStream(objectOpenFunction.open(object));
   }
 
-  private boolean isConnectionReset(Throwable t)
+  private static boolean isConnectionReset(Throwable t)
   {
     return (t instanceof SocketException && (t.getMessage() != null && t.getMessage().contains("Connection reset"))) ||
            (t.getCause() != null && isConnectionReset(t.getCause()));
@@ -71,7 +91,7 @@ public class RetryingInputStream<T> extends InputStream
 
   private void waitOrThrow(Throwable t, int nTry) throws IOException
   {
-    final boolean isConnectionReset = isConnectionReset(t);
+    final boolean isConnectionReset = resetCondition.apply(t);
     if (isConnectionReset || retryCondition.apply(t)) {
       if (isConnectionReset) {
         // Re-open the input stream on connection reset
diff --git a/core/src/main/java/org/apache/druid/data/input/impl/prefetch/FileFetcher.java b/core/src/main/java/org/apache/druid/data/input/impl/prefetch/FileFetcher.java
index 7d869da..5009c4f 100644
--- a/core/src/main/java/org/apache/druid/data/input/impl/prefetch/FileFetcher.java
+++ b/core/src/main/java/org/apache/druid/data/input/impl/prefetch/FileFetcher.java
@@ -98,7 +98,7 @@ public class FileFetcher<T> extends Fetcher<T>
   {
     return new OpenObject<>(
         object,
-        new RetryingInputStream<>(object, openObjectFunction, retryCondition, getFetchConfig().getMaxFetchRetry()),
+        new RetryingInputStream<>(object, openObjectFunction, retryCondition, null, getFetchConfig().getMaxFetchRetry()),
         getNoopCloser()
     );
   }
diff --git a/core/src/test/java/org/apache/druid/data/input/impl/prefetch/RetryingInputStreamTest.java b/core/src/test/java/org/apache/druid/data/input/impl/prefetch/RetryingInputStreamTest.java
index 51b5206..a2dd29e 100644
--- a/core/src/test/java/org/apache/druid/data/input/impl/prefetch/RetryingInputStreamTest.java
+++ b/core/src/test/java/org/apache/druid/data/input/impl/prefetch/RetryingInputStreamTest.java
@@ -29,6 +29,7 @@ import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 
+import javax.annotation.Nonnull;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.File;
@@ -43,13 +44,33 @@ import java.util.zip.GZIPOutputStream;
 public class RetryingInputStreamTest
 {
   private static final int MAX_RETRY = 5;
-  private static final int MAX_ERROR = 4;
 
   @Rule
   public TemporaryFolder temporaryFolder = new TemporaryFolder();
 
   private File testFile;
-  private DataInputStream inputStream;
+
+  private boolean throwSocketException = false;
+  private boolean throwCustomException = false;
+  private boolean throwIOException = false;
+
+
+  private final ObjectOpenFunction<File> objectOpenFunction = new ObjectOpenFunction<File>()
+  {
+    @Override
+    public InputStream open(File object) throws IOException
+    {
+      return new TestInputStream(new FileInputStream(object));
+    }
+
+    @Override
+    public InputStream open(File object, long start) throws IOException
+    {
+      final FileInputStream fis = new FileInputStream(object);
+      Preconditions.checkState(fis.skip(start) == start);
+      return new TestInputStream(fis);
+    }
+  };
 
   @Before
   public void setup() throws IOException
@@ -63,53 +84,83 @@ public class RetryingInputStreamTest
         dis.writeInt(i);
       }
     }
+    throwSocketException = false;
+    throwCustomException = false;
+    throwIOException = false;
+  }
+
+  @After
+  public void teardown() throws IOException
+  {
+    FileUtils.forceDelete(testFile);
+  }
 
-    throwError = false;
 
+  @Test(expected = IOException.class)
+  public void testDefaultsReadThrows() throws IOException
+  {
+    throwIOException = true;
     final InputStream retryingInputStream = new RetryingInputStream<>(
         testFile,
-        new ObjectOpenFunction<File>()
-        {
-          @Override
-          public InputStream open(File object) throws IOException
-          {
-            return new TestInputStream(new FileInputStream(object));
-          }
-
-          @Override
-          public InputStream open(File object, long start) throws IOException
-          {
-            final FileInputStream fis = new FileInputStream(object);
-            Preconditions.checkState(fis.skip(start) == start);
-            return new TestInputStream(fis);
-          }
-        },
-        e -> e instanceof IOException,
+        objectOpenFunction,
+        null, // will not retry
+        null, // will enable reset using default logic
         MAX_RETRY
     );
+    retryHelper(retryingInputStream);
+  }
 
-    inputStream = new DataInputStream(new GZIPInputStream(retryingInputStream));
-
-    throwError = true;
+  @Test
+  public void testCustomResetRead() throws IOException
+  {
+    throwCustomException = true;
+    final InputStream retryingInputStream = new RetryingInputStream<>(
+        testFile,
+        objectOpenFunction,
+        null, // retry will fail
+        t -> t instanceof CustomException, // but reset won't
+        MAX_RETRY
+    );
+    retryHelper(retryingInputStream);
   }
 
-  @After
-  public void teardown() throws IOException
+  @Test(expected = IOException.class)
+  public void testCustomResetReadThrows() throws IOException
   {
-    inputStream.close();
-    FileUtils.forceDelete(testFile);
+    throwCustomException = true;
+    final InputStream retryingInputStream = new RetryingInputStream<>(
+        testFile,
+        objectOpenFunction,
+        null, // will not retry
+        null, // since there is no custom reset lambda it will fail when the custom exception is thrown
+        MAX_RETRY
+    );
+    retryHelper(retryingInputStream);
   }
 
   @Test
-  public void testReadRetry() throws IOException
+  public void testIOExceptionNotRetriableRead() throws IOException
   {
-    for (int i = 0; i < 10000; i++) {
-      Assert.assertEquals(i, inputStream.readInt());
-    }
+    throwCustomException = true;
+    throwIOException = true;
+    final InputStream retryingInputStream = new RetryingInputStream<>(
+        testFile,
+        objectOpenFunction,
+        t -> t instanceof IOException, // retry will succeed
+        t -> t instanceof CustomException, // reset will also succeed
+        MAX_RETRY
+    );
+    retryHelper(retryingInputStream);
   }
 
-  private boolean throwError = true;
-  private int errorCount = 0;
+  private void retryHelper(InputStream retryingInputStream) throws IOException
+  {
+    try (DataInputStream inputStream = new DataInputStream(new GZIPInputStream(retryingInputStream))) {
+      for (int i = 0; i < 10000; i++) {
+        Assert.assertEquals(i, inputStream.readInt());
+      }
+    }
+  }
 
   private class TestInputStream extends InputStream
   {
@@ -127,21 +178,30 @@ public class RetryingInputStreamTest
     }
 
     @Override
-    public int read(byte[] b, int off, int len) throws IOException
+    public int read(@Nonnull byte[] b, int off, int len) throws IOException
     {
-      if (throwError) {
-        throwError = false;
-        errorCount++;
-        if (errorCount % 2 == 0) {
-          throw new IOException("test retry");
-        } else {
-          delegate.close();
-          throw new SocketException("Test Connection reset");
-        }
+      if (throwIOException) {
+        throwIOException = false;
+        throw new IOException("test retry");
+      } else if (throwCustomException) {
+        throwCustomException = false;
+        RuntimeException e = new RuntimeException();
+        throw new CustomException("I am a custom ResettableException", e);
+      } else if (throwSocketException) {
+        throwSocketException = false;
+        delegate.close();
+        throw new SocketException("Test Connection reset");
       } else {
-        throwError = errorCount < MAX_ERROR;
         return delegate.read(b, off, len);
       }
     }
   }
+
+  private static class CustomException extends RuntimeException
+  {
+    public CustomException(String err, Throwable t)
+    {
+      super(err, t);
+    }
+  }
 }
diff --git a/extensions-core/s3-extensions/src/main/java/org/apache/druid/data/input/s3/S3Entity.java b/extensions-core/s3-extensions/src/main/java/org/apache/druid/data/input/s3/S3Entity.java
index 9c05cc3..fb32592 100644
--- a/extensions-core/s3-extensions/src/main/java/org/apache/druid/data/input/s3/S3Entity.java
+++ b/extensions-core/s3-extensions/src/main/java/org/apache/druid/data/input/s3/S3Entity.java
@@ -19,13 +19,16 @@
 
 package org.apache.druid.data.input.s3;
 
+import com.amazonaws.SdkClientException;
 import com.amazonaws.services.s3.model.AmazonS3Exception;
 import com.amazonaws.services.s3.model.GetObjectRequest;
 import com.amazonaws.services.s3.model.S3Object;
+import com.google.common.base.Preconditions;
 import com.google.common.base.Predicate;
 import org.apache.druid.data.input.RetryingInputEntity;
 import org.apache.druid.data.input.impl.CloudObjectLocation;
 import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.RetryUtils;
 import org.apache.druid.storage.s3.S3StorageDruidModule;
 import org.apache.druid.storage.s3.S3Utils;
 import org.apache.druid.storage.s3.ServerSideEncryptingAmazonS3;
@@ -38,11 +41,29 @@ public class S3Entity extends RetryingInputEntity
 {
   private final ServerSideEncryptingAmazonS3 s3Client;
   private final CloudObjectLocation object;
+  private final int maxRetries;
 
   S3Entity(ServerSideEncryptingAmazonS3 s3Client, CloudObjectLocation coords)
   {
     this.s3Client = s3Client;
     this.object = coords;
+    this.maxRetries = RetryUtils.DEFAULT_MAX_TRIES;
+  }
+
+  // this was added for testing but it might be useful in other cases (you can
+  // configure maxRetries...
+  S3Entity(ServerSideEncryptingAmazonS3 s3Client, CloudObjectLocation coords, int maxRetries)
+  {
+    Preconditions.checkArgument(maxRetries >= 0);
+    this.s3Client = s3Client;
+    this.object = coords;
+    this.maxRetries = maxRetries;
+  }
+
+  @Override
+  protected int getMaxRetries()
+  {
+    return maxRetries;
   }
 
   @Override
@@ -84,4 +105,16 @@ public class S3Entity extends RetryingInputEntity
   {
     return S3Utils.S3RETRY;
   }
+
+  @Override
+  public Predicate<Throwable> getResetCondition()
+  {
+    // SdkClientException can be thrown for many reasons and the only way to
+    // distinguish it is to look at the message, this is not ideal since the
+    // message may change so it may need to be adjusted in the future
+    return t -> super.getResetCondition().apply(t) ||
+                (t instanceof SdkClientException &&
+                 t.getMessage().contains("Data read has a different length than the expected"));
+  }
+
 }
diff --git a/extensions-core/s3-extensions/src/main/java/org/apache/druid/data/input/s3/S3InputSource.java b/extensions-core/s3-extensions/src/main/java/org/apache/druid/data/input/s3/S3InputSource.java
index 24fd99c..20688ab 100644
--- a/extensions-core/s3-extensions/src/main/java/org/apache/druid/data/input/s3/S3InputSource.java
+++ b/extensions-core/s3-extensions/src/main/java/org/apache/druid/data/input/s3/S3InputSource.java
@@ -40,6 +40,7 @@ import org.apache.druid.data.input.SplitHintSpec;
 import org.apache.druid.data.input.impl.CloudObjectInputSource;
 import org.apache.druid.data.input.impl.CloudObjectLocation;
 import org.apache.druid.data.input.impl.SplittableInputSource;
+import org.apache.druid.java.util.common.RetryUtils;
 import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.storage.s3.S3InputDataConfig;
 import org.apache.druid.storage.s3.S3StorageDruidModule;
@@ -67,6 +68,7 @@ public class S3InputSource extends CloudObjectInputSource
   private final S3InputSourceConfig s3InputSourceConfig;
   private final S3InputDataConfig inputDataConfig;
   private final AWSCredentialsProvider awsCredentialsProvider;
+  private int maxRetries;
 
   /**
    * Constructor for S3InputSource
@@ -124,6 +126,7 @@ public class S3InputSource extends CloudObjectInputSource
           }
         }
     );
+    this.maxRetries = RetryUtils.DEFAULT_MAX_TRIES;
     this.awsCredentialsProvider = awsCredentialsProvider;
   }
 
@@ -141,6 +144,22 @@ public class S3InputSource extends CloudObjectInputSource
     this(s3Client, s3ClientBuilder, inputDataConfig, uris, prefixes, objects, s3InputSourceConfig, null);
   }
 
+  @VisibleForTesting
+  public S3InputSource(
+      ServerSideEncryptingAmazonS3 s3Client,
+      ServerSideEncryptingAmazonS3.Builder s3ClientBuilder,
+      S3InputDataConfig inputDataConfig,
+      List<URI> uris,
+      List<URI> prefixes,
+      List<CloudObjectLocation> objects,
+      S3InputSourceConfig s3InputSourceConfig,
+      int maxRetries
+  )
+  {
+    this(s3Client, s3ClientBuilder, inputDataConfig, uris, prefixes, objects, s3InputSourceConfig, null);
+    this.maxRetries = maxRetries;
+  }
+
   private void applyAssumeRole(
       ServerSideEncryptingAmazonS3.Builder s3ClientBuilder,
       S3InputSourceConfig s3InputSourceConfig,
@@ -186,7 +205,7 @@ public class S3InputSource extends CloudObjectInputSource
   @Override
   protected InputEntity createEntity(CloudObjectLocation location)
   {
-    return new S3Entity(s3ClientSupplier.get(), location);
+    return new S3Entity(s3ClientSupplier.get(), location, maxRetries);
   }
 
   @Override
@@ -254,6 +273,10 @@ public class S3InputSource extends CloudObjectInputSource
 
   private Iterable<S3ObjectSummary> getIterableObjectsFromPrefixes()
   {
-    return () -> S3Utils.objectSummaryIterator(s3ClientSupplier.get(), getPrefixes(), inputDataConfig.getMaxListingLength());
+    return () -> S3Utils.objectSummaryIterator(s3ClientSupplier.get(),
+                                               getPrefixes(),
+                                               inputDataConfig.getMaxListingLength(),
+                                               maxRetries
+                                               );
   }
 }
diff --git a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/ObjectSummaryIterator.java b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/ObjectSummaryIterator.java
index a515c15..59c6aec 100644
--- a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/ObjectSummaryIterator.java
+++ b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/ObjectSummaryIterator.java
@@ -23,7 +23,9 @@ import com.amazonaws.services.s3.model.AmazonS3Exception;
 import com.amazonaws.services.s3.model.ListObjectsV2Request;
 import com.amazonaws.services.s3.model.ListObjectsV2Result;
 import com.amazonaws.services.s3.model.S3ObjectSummary;
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.druid.java.util.common.RE;
+import org.apache.druid.java.util.common.RetryUtils;
 
 import java.net.URI;
 import java.util.Iterator;
@@ -45,6 +47,8 @@ public class ObjectSummaryIterator implements Iterator<S3ObjectSummary>
   private ListObjectsV2Result result;
   private Iterator<S3ObjectSummary> objectSummaryIterator;
   private S3ObjectSummary currentObjectSummary;
+  private int maxRetries; // this is made available for testing mostly
+
 
   ObjectSummaryIterator(
       final ServerSideEncryptingAmazonS3 s3Client,
@@ -55,7 +59,32 @@ public class ObjectSummaryIterator implements Iterator<S3ObjectSummary>
     this.s3Client = s3Client;
     this.prefixesIterator = prefixes.iterator();
     this.maxListingLength = maxListingLength;
+    maxRetries = RetryUtils.DEFAULT_MAX_TRIES;
+
+    constructorPostProcessing();
 
+  }
+
+  @VisibleForTesting
+  ObjectSummaryIterator(
+      final ServerSideEncryptingAmazonS3 s3Client,
+      final Iterable<URI> prefixes,
+      final int maxListingLength,
+      final int maxRetries
+  )
+  {
+    this.s3Client = s3Client;
+    this.prefixesIterator = prefixes.iterator();
+    this.maxListingLength = maxListingLength;
+    this.maxRetries = maxRetries;
+
+    constructorPostProcessing();
+
+  }
+
+  // helper to factor out stuff that happens in constructor after members are set
+  private void constructorPostProcessing()
+  {
     prepareNextRequest();
     fetchNextBatch();
     advanceObjectSummary();
@@ -94,7 +123,7 @@ public class ObjectSummaryIterator implements Iterator<S3ObjectSummary>
   private void fetchNextBatch()
   {
     try {
-      result = S3Utils.retryS3Operation(() -> s3Client.listObjectsV2(request));
+      result = S3Utils.retryS3Operation(() -> s3Client.listObjectsV2(request), maxRetries);
       request.setContinuationToken(result.getNextContinuationToken());
       objectSummaryIterator = result.getObjectSummaries().iterator();
     }
diff --git a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3Utils.java b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3Utils.java
index 75c4e12..1e6869f 100644
--- a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3Utils.java
+++ b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3Utils.java
@@ -83,6 +83,15 @@ public class S3Utils
     return RetryUtils.retry(f, S3RETRY, RetryUtils.DEFAULT_MAX_TRIES);
   }
 
+  /**
+   * Retries S3 operations that fail due to io-related exceptions. Service-level exceptions (access denied, file not
+   * found, etc) are not retried. Also provide a way to set maxRetries that can be useful, i.e. for testing.
+   */
+  static <T> T retryS3Operation(Task<T> f, int maxRetries) throws Exception
+  {
+    return RetryUtils.retry(f, S3RETRY, maxRetries);
+  }
+
   static boolean isObjectInBucketIgnoringPermission(
       ServerSideEncryptingAmazonS3 s3Client,
       String bucketName,
@@ -121,6 +130,25 @@ public class S3Utils
   }
 
   /**
+   * Create an iterator over a set of S3 objects specified by a set of prefixes.
+   *
+   * For each provided prefix URI, the iterator will walk through all objects that are in the same bucket as the
+   * provided URI and whose keys start with that URI's path, except for directory placeholders (which will be
+   * ignored). The iterator is computed incrementally by calling {@link ServerSideEncryptingAmazonS3#listObjectsV2} for
+   * each prefix in batches of {@param maxListLength}. The first call is made at the same time the iterator is
+   * constructed.
+   */
+  public static Iterator<S3ObjectSummary> objectSummaryIterator(
+      final ServerSideEncryptingAmazonS3 s3Client,
+      final Iterable<URI> prefixes,
+      final int maxListingLength,
+      final int maxRetries
+  )
+  {
+    return new ObjectSummaryIterator(s3Client, prefixes, maxListingLength, maxRetries);
+  }
+
+  /**
    * Create an {@link URI} from the given {@link S3ObjectSummary}. The result URI is composed as below.
    *
    * <pre>
diff --git a/extensions-core/s3-extensions/src/test/java/org/apache/druid/data/input/s3/S3InputSourceTest.java b/extensions-core/s3-extensions/src/test/java/org/apache/druid/data/input/s3/S3InputSourceTest.java
index 957dd4d..cc5c55b 100644
--- a/extensions-core/s3-extensions/src/test/java/org/apache/druid/data/input/s3/S3InputSourceTest.java
+++ b/extensions-core/s3-extensions/src/test/java/org/apache/druid/data/input/s3/S3InputSourceTest.java
@@ -19,6 +19,7 @@
 
 package org.apache.druid.data.input.s3;
 
+import com.amazonaws.SdkClientException;
 import com.amazonaws.auth.AWSCredentialsProvider;
 import com.amazonaws.services.s3.AmazonS3;
 import com.amazonaws.services.s3.AmazonS3Client;
@@ -28,6 +29,7 @@ import com.amazonaws.services.s3.model.GetObjectRequest;
 import com.amazonaws.services.s3.model.ListObjectsV2Request;
 import com.amazonaws.services.s3.model.ListObjectsV2Result;
 import com.amazonaws.services.s3.model.S3Object;
+import com.amazonaws.services.s3.model.S3ObjectInputStream;
 import com.amazonaws.services.s3.model.S3ObjectSummary;
 import com.fasterxml.jackson.core.JsonParser;
 import com.fasterxml.jackson.databind.DeserializationContext;
@@ -87,6 +89,8 @@ import java.util.List;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
+import static org.easymock.EasyMock.expectLastCall;
+
 public class S3InputSourceTest extends InitializedNullHandlingTest
 {
   private static final ObjectMapper MAPPER = createS3ObjectMapper();
@@ -535,6 +539,49 @@ public class S3InputSourceTest extends InitializedNullHandlingTest
     EasyMock.verify(S3_CLIENT);
   }
 
+  @Test(expected = SdkClientException.class)
+  public void testReaderRetriesOnSdkClientExceptionButNeverSucceedsThenThrows() throws Exception
+  {
+    EasyMock.reset(S3_CLIENT);
+    expectListObjects(PREFIXES.get(0), ImmutableList.of(EXPECTED_URIS.get(0)), CONTENT);
+    expectSdkClientException(EXPECTED_URIS.get(0));
+    EasyMock.replay(S3_CLIENT);
+
+    S3InputSource inputSource = new S3InputSource(
+        SERVICE,
+        SERVER_SIDE_ENCRYPTING_AMAZON_S3_BUILDER,
+        INPUT_DATA_CONFIG,
+        null,
+        ImmutableList.of(PREFIXES.get(0)),
+        null,
+        null,
+        3 // only have three retries since they are slow
+    );
+
+    InputRowSchema someSchema = new InputRowSchema(
+        new TimestampSpec("time", "auto", null),
+        new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("dim1", "dim2"))),
+        ColumnsFilter.all()
+    );
+
+    InputSourceReader reader = inputSource.reader(
+        someSchema,
+        new CsvInputFormat(ImmutableList.of("time", "dim1", "dim2"), "|", false, null, 0),
+        temporaryFolder.newFolder()
+    );
+
+    CloseableIterator<InputRow> iterator = reader.read();
+
+    while (iterator.hasNext()) {
+      InputRow nextRow = iterator.next();
+      Assert.assertEquals(NOW, nextRow.getTimestamp());
+      Assert.assertEquals("hello", nextRow.getDimension("dim1").get(0));
+      Assert.assertEquals("world", nextRow.getDimension("dim2").get(0));
+    }
+
+    EasyMock.verify(S3_CLIENT);
+  }
+
   @Test
   public void testCompressedReader() throws IOException
   {
@@ -620,6 +667,31 @@ public class S3InputSourceTest extends InitializedNullHandlingTest
     EasyMock.expect(S3_CLIENT.getObject(EasyMock.anyObject(GetObjectRequest.class))).andReturn(someObject).once();
   }
 
+
+  // Setup mocks for invoquing the resettable condition for the S3Entity:
+  private static void expectSdkClientException(URI uri) throws IOException
+  {
+    final String s3Bucket = uri.getAuthority();
+    final String key = S3Utils.extractS3Key(uri);
+
+    S3ObjectInputStream someInputStream = EasyMock.createMock(S3ObjectInputStream.class);
+    EasyMock.expect(someInputStream.read(EasyMock.anyObject(), EasyMock.anyInt(), EasyMock.anyInt()))
+            .andThrow(new SdkClientException("Data read has a different length than the expected")).anyTimes();
+    someInputStream.close();
+    expectLastCall().andVoid().anyTimes();
+
+    S3Object someObject = EasyMock.createMock(S3Object.class);
+    EasyMock.expect(someObject.getBucketName()).andReturn(s3Bucket).anyTimes();
+    EasyMock.expect(someObject.getKey()).andReturn(key).anyTimes();
+    EasyMock.expect(someObject.getObjectContent()).andReturn(someInputStream).anyTimes();
+
+    EasyMock.expect(S3_CLIENT.getObject(EasyMock.anyObject(GetObjectRequest.class))).andReturn(someObject).anyTimes();
+
+    EasyMock.replay(someObject);
+    EasyMock.replay(someInputStream);
+  }
+
+
   private static void expectGetObjectCompressed(URI uri) throws IOException
   {
     final String s3Bucket = uri.getAuthority();

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org