You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2022/05/31 06:43:33 UTC

[GitHub] [iceberg] jackye1995 opened a new pull request, #4912: AWS: add retry logic to S3InputStream

jackye1995 opened a new pull request, #4912:
URL: https://github.com/apache/iceberg/pull/4912

   @danielcweeks @rajarshisarkar @amogh-jahagirdar @xiaoxuandev @singhpk234 
   
   Add retry for the `S3InputStream` so that when we encounter network failures (mostly `SSLException` for server side connection reset and `SocketTimoutException` for client side connection reset), we can quickly retry the read without failing the entire operation.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] jackye1995 commented on a diff in pull request #4912: AWS: add retry logic to S3InputStream

Posted by GitBox <gi...@apache.org>.
jackye1995 commented on code in PR #4912:
URL: https://github.com/apache/iceberg/pull/4912#discussion_r885258008


##########
aws/src/test/java/org/apache/iceberg/aws/s3/TestS3InputStream.java:
##########
@@ -20,45 +20,216 @@
 package org.apache.iceberg.aws.s3;
 
 import com.adobe.testing.s3mock.junit4.S3MockRule;
+import java.io.EOFException;
 import java.io.IOException;
 import java.util.Arrays;
 import java.util.Random;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Function;
 import org.apache.commons.io.IOUtils;
+import org.apache.iceberg.AssertHelpers;
+import org.apache.iceberg.aws.AwsProperties;
 import org.apache.iceberg.io.RangeReadable;
 import org.apache.iceberg.io.SeekableInputStream;
+import org.apache.iceberg.metrics.MetricsContext;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.ClassRule;
 import org.junit.Test;
+import software.amazon.awssdk.awscore.exception.AwsServiceException;
+import software.amazon.awssdk.core.exception.SdkClientException;
 import software.amazon.awssdk.core.sync.RequestBody;
+import software.amazon.awssdk.core.sync.ResponseTransformer;
 import software.amazon.awssdk.services.s3.S3Client;
+import software.amazon.awssdk.services.s3.model.BucketAlreadyExistsException;
+import software.amazon.awssdk.services.s3.model.BucketAlreadyOwnedByYouException;
 import software.amazon.awssdk.services.s3.model.CreateBucketRequest;
+import software.amazon.awssdk.services.s3.model.CreateBucketResponse;
+import software.amazon.awssdk.services.s3.model.GetObjectRequest;
+import software.amazon.awssdk.services.s3.model.GetObjectResponse;
+import software.amazon.awssdk.services.s3.model.HeadObjectRequest;
+import software.amazon.awssdk.services.s3.model.HeadObjectResponse;
+import software.amazon.awssdk.services.s3.model.InvalidObjectStateException;
+import software.amazon.awssdk.services.s3.model.NoSuchKeyException;
 import software.amazon.awssdk.services.s3.model.PutObjectRequest;
+import software.amazon.awssdk.services.s3.model.PutObjectResponse;
+import software.amazon.awssdk.services.s3.model.S3Exception;
 
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertThrows;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.spy;
 
 public class TestS3InputStream {
   @ClassRule
   public static final S3MockRule S3_MOCK_RULE = S3MockRule.builder().silent().build();
 
-  private final S3Client s3 = S3_MOCK_RULE.createS3ClientV2();
+  private final S3Client s3Client = S3_MOCK_RULE.createS3ClientV2();
   private final Random random = new Random(1);
 
   @Before
   public void before() {
-    s3.createBucket(CreateBucketRequest.builder().bucket("bucket").build());
+    s3Client.createBucket(CreateBucketRequest.builder().bucket("bucket").build());
   }
 
   @Test
-  public void testRead() throws Exception {
+  public void testReadWithNormalClient() throws Exception {

Review Comment:
   cannot really figure out a way to do parameterized test here due to the use of `S3MockRule`, parameterization always gives me initialization failure... I will continue to see if I can remove the redundant tests, meanwhile I will also update integration test to run against both normal client and a fuzzy client.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] jackye1995 closed pull request #4912: AWS: add retry logic to S3InputStream

Posted by "jackye1995 (via GitHub)" <gi...@apache.org>.
jackye1995 closed pull request #4912: AWS: add retry logic to S3InputStream
URL: https://github.com/apache/iceberg/pull/4912


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] jackye1995 commented on a diff in pull request #4912: AWS: add retry logic to S3InputStream

Posted by GitBox <gi...@apache.org>.
jackye1995 commented on code in PR #4912:
URL: https://github.com/apache/iceberg/pull/4912#discussion_r889726156


##########
aws/src/main/java/org/apache/iceberg/aws/s3/S3InputStream.java:
##########
@@ -178,25 +281,56 @@ private void positionStream() throws IOException {
   }
 
   private void openStream() throws IOException {
-    GetObjectRequest.Builder requestBuilder = GetObjectRequest.builder()
-        .bucket(location.bucket())
-        .key(location.key())
-        .range(String.format("bytes=%s-", pos));
-
-    S3RequestUtil.configureEncryption(awsProperties, requestBuilder);
-
     closeStream();
-    stream = s3.getObject(requestBuilder.build(), ResponseTransformer.toInputStream());
+    stream = readRange(String.format("bytes=%s-", pos));
   }
 
   private void closeStream() throws IOException {
-    if (stream != null) {
-      stream.close();
+    closeServerSideStream(stream);
+    stream = null;
+  }
+
+  private static void closeServerSideStream(InputStream streamToClose) {
+    if (streamToClose != null) {
+      try {
+        if (streamToClose instanceof ResponseInputStream) {
+          // Stated in the ResponseInputStream javadoc:
+          // If it is not desired to read remaining data from the stream,
+          // you can explicitly abort the connection via abort().
+          ((ResponseInputStream<?>) streamToClose).abort();
+        } else {
+          streamToClose.close();
+        }
+      } catch (IOException | AbortedException e) {
+        // ignore failure to abort or close stream
+      }
     }
   }
 
-  public void setSkipSize(int skipSize) {
-    this.skipSize = skipSize;
+  private static boolean shouldRetry(Exception exception) {
+    if (exception instanceof UncheckedIOException) {
+      if (exception.getCause() instanceof EOFException) {
+        return false;
+      }
+    }
+
+    if (exception instanceof AwsServiceException) {
+      switch (((AwsServiceException) exception).statusCode()) {
+        case HttpURLConnection.HTTP_FORBIDDEN:
+        case HttpURLConnection.HTTP_BAD_REQUEST:
+          return false;
+      }
+    }
+
+    if (exception instanceof S3Exception) {
+      switch (((S3Exception) exception).statusCode()) {
+        case HttpURLConnection.HTTP_NOT_FOUND:
+        case 416: // range not satisfied
+          return false;
+      }
+    }
+
+    return true;

Review Comment:
   It's not. It's closer to the ones in Presto and Trino. Basically it retires almost all IO exceptions except for EOF, because they are most likely network issues. For AWS side exceptions, this logic seems sufficient to me if they are proven sufficient in Presto and Trino. I am not sure if we need to list every single possible exception class like S3A did. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] danielcweeks commented on a diff in pull request #4912: AWS: add retry logic to S3InputStream

Posted by GitBox <gi...@apache.org>.
danielcweeks commented on code in PR #4912:
URL: https://github.com/apache/iceberg/pull/4912#discussion_r898507252


##########
aws/src/main/java/org/apache/iceberg/aws/s3/S3InputStream.java:
##########
@@ -172,31 +274,62 @@ private void positionStream() throws IOException {
     }
 
     // close the stream and open at desired position
-    LOG.debug("Seek with new stream for {} to offset {}", location, next);
+    LOG.warn("Seek with new stream for {} to offset {}", location, next);
     pos = next;
     openStream();
   }
 
-  private void openStream() throws IOException {
-    GetObjectRequest.Builder requestBuilder = GetObjectRequest.builder()
-        .bucket(location.bucket())
-        .key(location.key())
-        .range(String.format("bytes=%s-", pos));
-
-    S3RequestUtil.configureEncryption(awsProperties, requestBuilder);
-
+  private void openStream() {
     closeStream();
-    stream = s3.getObject(requestBuilder.build(), ResponseTransformer.toInputStream());
+    stream = readRange(String.format("bytes=%s-", pos));
+  }
+
+  private void closeStream() {
+    closeServerSideStream(stream);
+    stream = null;
   }
 
-  private void closeStream() throws IOException {
-    if (stream != null) {
-      stream.close();
+  private static void closeServerSideStream(InputStream streamToClose) {
+    if (streamToClose != null) {
+      try {
+        if (streamToClose instanceof Abortable) {
+          // Stated in the ResponseInputStream javadoc:
+          // If it is not desired to read remaining data from the stream,
+          // you can explicitly abort the connection via abort().
+          ((Abortable) streamToClose).abort();
+        } else {
+          streamToClose.close();
+        }
+      } catch (IOException | AbortedException e) {
+        // ignore failure to abort or close stream
+      }
     }
   }
 
-  public void setSkipSize(int skipSize) {
-    this.skipSize = skipSize;
+  private static boolean shouldRetry(Exception exception) {

Review Comment:
   I think we need to be more explicit about the cases where we do want to retry rather than just defaulting to retry.  I think there were specific cases that were mentined as known issues (e.g. socket timeout).  However, the problem we've had in other retry scenarios is that the retry cases are overly broad and retry when they really shouldn't.  
   
   I think the default here should return false and only return true for the known Exceptions.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] amogh-jahagirdar commented on a diff in pull request #4912: AWS: add retry logic to S3InputStream

Posted by GitBox <gi...@apache.org>.
amogh-jahagirdar commented on code in PR #4912:
URL: https://github.com/apache/iceberg/pull/4912#discussion_r886199393


##########
aws/src/main/java/org/apache/iceberg/aws/AwsProperties.java:
##########
@@ -353,6 +353,30 @@ public class AwsProperties implements Serializable {
   @Deprecated
   public static final boolean CLIENT_ENABLE_ETAG_CHECK_DEFAULT = false;
 
+  /**
+   * Number of times to retry S3 read operation.
+   */
+  public static final String S3_READ_RETRY_NUM_RETRIES = "s3.read.retry.num-retries";
+  public static final int S3_READ_RETRY_NUM_RETRIES_DEFAULT = 6;

Review Comment:
   Just want to confirm how were these default values selected. From Hadoop S3A configurations?



##########
aws/src/main/java/org/apache/iceberg/aws/s3/S3InputStream.java:
##########
@@ -178,25 +281,56 @@ private void positionStream() throws IOException {
   }
 
   private void openStream() throws IOException {
-    GetObjectRequest.Builder requestBuilder = GetObjectRequest.builder()
-        .bucket(location.bucket())
-        .key(location.key())
-        .range(String.format("bytes=%s-", pos));
-
-    S3RequestUtil.configureEncryption(awsProperties, requestBuilder);
-
     closeStream();
-    stream = s3.getObject(requestBuilder.build(), ResponseTransformer.toInputStream());
+    stream = readRange(String.format("bytes=%s-", pos));
   }
 
   private void closeStream() throws IOException {
-    if (stream != null) {
-      stream.close();
+    closeServerSideStream(stream);
+    stream = null;
+  }
+
+  private static void closeServerSideStream(InputStream streamToClose) {
+    if (streamToClose != null) {
+      try {
+        if (streamToClose instanceof ResponseInputStream) {
+          // Stated in the ResponseInputStream javadoc:
+          // If it is not desired to read remaining data from the stream,
+          // you can explicitly abort the connection via abort().
+          ((ResponseInputStream<?>) streamToClose).abort();
+        } else {
+          streamToClose.close();
+        }
+      } catch (IOException | AbortedException e) {
+        // ignore failure to abort or close stream
+      }
     }
   }
 
-  public void setSkipSize(int skipSize) {
-    this.skipSize = skipSize;
+  private static boolean shouldRetry(Exception exception) {
+    if (exception instanceof UncheckedIOException) {
+      if (exception.getCause() instanceof EOFException) {
+        return false;
+      }
+    }
+
+    if (exception instanceof AwsServiceException) {
+      switch (((AwsServiceException) exception).statusCode()) {
+        case HttpURLConnection.HTTP_FORBIDDEN:
+        case HttpURLConnection.HTTP_BAD_REQUEST:
+          return false;
+      }
+    }
+
+    if (exception instanceof S3Exception) {
+      switch (((S3Exception) exception).statusCode()) {
+        case HttpURLConnection.HTTP_NOT_FOUND:
+        case 416: // range not satisfied
+          return false;
+      }
+    }
+
+    return true;

Review Comment:
   Curious is this the same retry policy that Hadoop S3A has? 



##########
aws/src/main/java/org/apache/iceberg/aws/s3/S3InputStream.java:
##########
@@ -88,23 +97,68 @@ public void seek(long newPos) {
 
   @Override
   public int read() throws IOException {
-    Preconditions.checkState(!closed, "Cannot read: already closed");
-    positionStream();
+    AtomicInteger byteRef = new AtomicInteger(0);
+    try {
+      Tasks.foreach(0)

Review Comment:
   Any reason we don't pass in the stream in the foreach? I guess it's a private field so it's a bit awkward to pass it in as an argument but seems more readable than Tasks.foreach(0) imo



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] jackye1995 commented on a diff in pull request #4912: AWS: add retry logic to S3InputStream

Posted by GitBox <gi...@apache.org>.
jackye1995 commented on code in PR #4912:
URL: https://github.com/apache/iceberg/pull/4912#discussion_r889725978


##########
aws/src/main/java/org/apache/iceberg/aws/s3/S3InputStream.java:
##########
@@ -88,23 +97,68 @@ public void seek(long newPos) {
 
   @Override
   public int read() throws IOException {
-    Preconditions.checkState(!closed, "Cannot read: already closed");
-    positionStream();
+    AtomicInteger byteRef = new AtomicInteger(0);
+    try {
+      Tasks.foreach(0)

Review Comment:
   I think we cannot, because the input stream needs to be closed and re-opened, but if we put it here then the retry will always retry against the same stream that is already closed after the first failure.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] singhpk234 commented on pull request #4912: AWS: add retry logic to S3InputStream

Posted by GitBox <gi...@apache.org>.
singhpk234 commented on PR #4912:
URL: https://github.com/apache/iceberg/pull/4912#issuecomment-1143718044

   [doubt] SDK V2 also has a possibility to define [retry policy / condition](https://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/retry/PredefinedRetryPolicies.html#DEFAULT_RETRY_CONDITION) by override [ClientConfiguration](https://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/ClientConfiguration.html) while building the s3 client itself, why are we opting do it here via `Tasks` here ?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] danielcweeks commented on a diff in pull request #4912: AWS: add retry logic to S3InputStream

Posted by GitBox <gi...@apache.org>.
danielcweeks commented on code in PR #4912:
URL: https://github.com/apache/iceberg/pull/4912#discussion_r898561762


##########
aws/src/main/java/org/apache/iceberg/aws/s3/S3InputStream.java:
##########
@@ -88,23 +96,69 @@ public void seek(long newPos) {
 
   @Override
   public int read() throws IOException {
-    Preconditions.checkState(!closed, "Cannot read: already closed");
-    positionStream();
+    int[] byteRef = new int[1];

Review Comment:
   We could also consider bringing in [Failsafe](https://failsafe.dev/), which is more closely aligned with what we want for these cases.  Though it is a new dependency (but zero deps and apache licensed).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] danielcweeks commented on a diff in pull request #4912: AWS: add retry logic to S3InputStream

Posted by GitBox <gi...@apache.org>.
danielcweeks commented on code in PR #4912:
URL: https://github.com/apache/iceberg/pull/4912#discussion_r898556389


##########
aws/src/main/java/org/apache/iceberg/aws/s3/S3InputStream.java:
##########
@@ -88,23 +96,69 @@ public void seek(long newPos) {
 
   @Override
   public int read() throws IOException {
-    Preconditions.checkState(!closed, "Cannot read: already closed");
-    positionStream();
+    int[] byteRef = new int[1];

Review Comment:
   This is less than ideal but Tasks is a little limited in this area because it's really based on threaded execution and we're reusing it here for retry.  It might be worth exploring whether we can tweak Tasks to support this usecase:
   
   Ideally we'd have something like:
   
   ```java
   int read = Tasks.single()
       .exponentialBackoff(...)
       ...
       run(<funciton>);
   ```
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] jackye1995 commented on a diff in pull request #4912: AWS: add retry logic to S3InputStream

Posted by GitBox <gi...@apache.org>.
jackye1995 commented on code in PR #4912:
URL: https://github.com/apache/iceberg/pull/4912#discussion_r889726237


##########
aws/src/main/java/org/apache/iceberg/aws/s3/S3InputStream.java:
##########
@@ -88,23 +97,68 @@ public void seek(long newPos) {
 
   @Override
   public int read() throws IOException {
-    Preconditions.checkState(!closed, "Cannot read: already closed");
-    positionStream();
+    AtomicInteger byteRef = new AtomicInteger(0);

Review Comment:
   Yes good point, it's only used to hold the value, not really trying to prevent any multi-thread access.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] jackye1995 commented on pull request #4912: AWS: add retry logic to S3InputStream

Posted by GitBox <gi...@apache.org>.
jackye1995 commented on PR #4912:
URL: https://github.com/apache/iceberg/pull/4912#issuecomment-1146866327

   > Introducing our own retries can ultimately conflict with other retries and can have some bad side-effects (like failures resulting in retries on top of retries).
   
   Yes you are right, I actually plan to add one more PR for configuring retry policy. The issue here is a bit different. I think there are 2 issues that cannot be handled at SDK level:
   1. AWS SDK only handles exceptions up to the AWS client calls, in this case `s3.getObject`. Once you get the input stream, further exceptions are not handled during operations like `read` and `seek`, and that's typically when network issues like connection timeout or socket timeout might happen. I haven't found logic in the SDK that could close a stream, re-seek to the right position and retry a read operation, but maybe I missed some code places.
   2. When closing the input stream, the closing should call `abort` instead of `close` to terminate the HTTP request. This should be done both at retry and stream close time. See [this javadoc](https://github.com/aws/aws-sdk-java-v2/blob/56e21b1d8e18f692ba9193c625661aa786e9946b/core/sdk-core/src/main/java/software/amazon/awssdk/core/ResponseInputStream.java#L29-L31) for more details.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] singhpk234 commented on a diff in pull request #4912: AWS: add retry logic to S3InputStream

Posted by GitBox <gi...@apache.org>.
singhpk234 commented on code in PR #4912:
URL: https://github.com/apache/iceberg/pull/4912#discussion_r885873002


##########
aws/src/main/java/org/apache/iceberg/aws/s3/S3InputStream.java:
##########
@@ -88,23 +97,68 @@ public void seek(long newPos) {
 
   @Override
   public int read() throws IOException {
-    Preconditions.checkState(!closed, "Cannot read: already closed");
-    positionStream();
+    AtomicInteger byteRef = new AtomicInteger(0);

Review Comment:
   [question] do we need AtomicInt here, I think the requirement for this arises, here since we are using it in lambda provided in Task below, we can also use `final int byteRef[] = {0}` , does it seems reasonable ? Considering it's single threaded access.



##########
aws/src/main/java/org/apache/iceberg/aws/AwsProperties.java:
##########
@@ -353,6 +353,30 @@ public class AwsProperties implements Serializable {
   @Deprecated
   public static final boolean CLIENT_ENABLE_ETAG_CHECK_DEFAULT = false;
 
+  /**
+   * Number of times to retry S3 read operation.
+   */
+  public static final String S3_READ_RETRY_NUM_RETRIES = "s3.read.retry.num-retries";
+  public static final int S3_READ_RETRY_NUM_RETRIES_DEFAULT = 6;

Review Comment:
   +1, would be really nice to know 
   
   I think they are diff S3A consts : [Code Pointer](https://github.com/apache/hadoop/blob/trunk/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ARetryPolicy.java#L122-L129)
   1. retry limit - 7
   2. retry interval - 500ms



##########
aws/src/main/java/org/apache/iceberg/aws/s3/S3InputStream.java:
##########
@@ -178,25 +281,56 @@ private void positionStream() throws IOException {
   }
 
   private void openStream() throws IOException {
-    GetObjectRequest.Builder requestBuilder = GetObjectRequest.builder()
-        .bucket(location.bucket())
-        .key(location.key())
-        .range(String.format("bytes=%s-", pos));
-
-    S3RequestUtil.configureEncryption(awsProperties, requestBuilder);
-
     closeStream();
-    stream = s3.getObject(requestBuilder.build(), ResponseTransformer.toInputStream());
+    stream = readRange(String.format("bytes=%s-", pos));
   }
 
   private void closeStream() throws IOException {

Review Comment:
   [minor] Now since we are catching the IOException in `closeServerSideStream` closeStream actually doesn't throw IOException we can remove it from signature of closeStream / openStream then



##########
aws/src/main/java/org/apache/iceberg/aws/s3/S3InputStream.java:
##########
@@ -178,25 +281,56 @@ private void positionStream() throws IOException {
   }
 
   private void openStream() throws IOException {
-    GetObjectRequest.Builder requestBuilder = GetObjectRequest.builder()
-        .bucket(location.bucket())
-        .key(location.key())
-        .range(String.format("bytes=%s-", pos));
-
-    S3RequestUtil.configureEncryption(awsProperties, requestBuilder);
-
     closeStream();
-    stream = s3.getObject(requestBuilder.build(), ResponseTransformer.toInputStream());
+    stream = readRange(String.format("bytes=%s-", pos));
   }
 
   private void closeStream() throws IOException {
-    if (stream != null) {
-      stream.close();
+    closeServerSideStream(stream);
+    stream = null;
+  }
+
+  private static void closeServerSideStream(InputStream streamToClose) {
+    if (streamToClose != null) {
+      try {
+        if (streamToClose instanceof ResponseInputStream) {
+          // Stated in the ResponseInputStream javadoc:
+          // If it is not desired to read remaining data from the stream,
+          // you can explicitly abort the connection via abort().
+          ((ResponseInputStream<?>) streamToClose).abort();
+        } else {
+          streamToClose.close();
+        }
+      } catch (IOException | AbortedException e) {
+        // ignore failure to abort or close stream
+      }
     }
   }
 
-  public void setSkipSize(int skipSize) {
-    this.skipSize = skipSize;
+  private static boolean shouldRetry(Exception exception) {
+    if (exception instanceof UncheckedIOException) {
+      if (exception.getCause() instanceof EOFException) {
+        return false;
+      }
+    }
+
+    if (exception instanceof AwsServiceException) {
+      switch (((AwsServiceException) exception).statusCode()) {
+        case HttpURLConnection.HTTP_FORBIDDEN:
+        case HttpURLConnection.HTTP_BAD_REQUEST:
+          return false;
+      }
+    }
+
+    if (exception instanceof S3Exception) {
+      switch (((S3Exception) exception).statusCode()) {
+        case HttpURLConnection.HTTP_NOT_FOUND:
+        case 416: // range not satisfied
+          return false;
+      }
+    }
+
+    return true;

Review Comment:
   Should we add this in a seperate util method ? Considering it can be extended to all s3 interactions
   
   Also any pointers if we know this is the complete list considering API's we use to connect to S3, For ex : (sample list : [S3A](https://github.com/apache/hadoop/blob/trunk/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ARetryPolicy.java#L177-L231))



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] danielcweeks commented on pull request #4912: AWS: add retry logic to S3InputStream

Posted by GitBox <gi...@apache.org>.
danielcweeks commented on PR #4912:
URL: https://github.com/apache/iceberg/pull/4912#issuecomment-1145156279

   I'd like to take a closer look at this. So please hold off on merging.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] jackye1995 commented on a diff in pull request #4912: AWS: add retry logic to S3InputStream

Posted by GitBox <gi...@apache.org>.
jackye1995 commented on code in PR #4912:
URL: https://github.com/apache/iceberg/pull/4912#discussion_r889726336


##########
aws/src/main/java/org/apache/iceberg/aws/s3/S3InputStream.java:
##########
@@ -178,25 +281,56 @@ private void positionStream() throws IOException {
   }
 
   private void openStream() throws IOException {
-    GetObjectRequest.Builder requestBuilder = GetObjectRequest.builder()
-        .bucket(location.bucket())
-        .key(location.key())
-        .range(String.format("bytes=%s-", pos));
-
-    S3RequestUtil.configureEncryption(awsProperties, requestBuilder);
-
     closeStream();
-    stream = s3.getObject(requestBuilder.build(), ResponseTransformer.toInputStream());
+    stream = readRange(String.format("bytes=%s-", pos));
   }
 
   private void closeStream() throws IOException {
-    if (stream != null) {
-      stream.close();
+    closeServerSideStream(stream);
+    stream = null;
+  }
+
+  private static void closeServerSideStream(InputStream streamToClose) {
+    if (streamToClose != null) {
+      try {
+        if (streamToClose instanceof ResponseInputStream) {
+          // Stated in the ResponseInputStream javadoc:
+          // If it is not desired to read remaining data from the stream,
+          // you can explicitly abort the connection via abort().
+          ((ResponseInputStream<?>) streamToClose).abort();
+        } else {
+          streamToClose.close();
+        }
+      } catch (IOException | AbortedException e) {
+        // ignore failure to abort or close stream
+      }
     }
   }
 
-  public void setSkipSize(int skipSize) {
-    this.skipSize = skipSize;
+  private static boolean shouldRetry(Exception exception) {
+    if (exception instanceof UncheckedIOException) {
+      if (exception.getCause() instanceof EOFException) {
+        return false;
+      }
+    }
+
+    if (exception instanceof AwsServiceException) {
+      switch (((AwsServiceException) exception).statusCode()) {
+        case HttpURLConnection.HTTP_FORBIDDEN:
+        case HttpURLConnection.HTTP_BAD_REQUEST:
+          return false;
+      }
+    }
+
+    if (exception instanceof S3Exception) {
+      switch (((S3Exception) exception).statusCode()) {
+        case HttpURLConnection.HTTP_NOT_FOUND:
+        case 416: // range not satisfied
+          return false;
+      }
+    }
+
+    return true;

Review Comment:
   Let's discuss this in the thread above https://github.com/apache/iceberg/pull/4912#discussion_r889726156



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] danielcweeks commented on a diff in pull request #4912: AWS: add retry logic to S3InputStream

Posted by GitBox <gi...@apache.org>.
danielcweeks commented on code in PR #4912:
URL: https://github.com/apache/iceberg/pull/4912#discussion_r898538575


##########
aws/src/main/java/org/apache/iceberg/aws/s3/S3InputStream.java:
##########
@@ -88,23 +96,69 @@ public void seek(long newPos) {
 
   @Override
   public int read() throws IOException {
-    Preconditions.checkState(!closed, "Cannot read: already closed");
-    positionStream();
+    int[] byteRef = new int[1];
+    try {
+      Tasks.foreach(0)

Review Comment:
   This is a lot of duplicate code for the retry logic.  Could we consolidate to a method that wraps just the logic we want to execute?
   
   For example:
   
   ```java
   retry(() -> {
       Preconditions.checkState(!closed, "Cannot read: already closed");
       positionStream();
   
       byteRef[0] =  stream.read();
   })
   ```
   
   (you many have to use consumer/function for the range versions, but seems like there might be an more concise way to write this)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] jackye1995 commented on a diff in pull request #4912: AWS: add retry logic to S3InputStream

Posted by GitBox <gi...@apache.org>.
jackye1995 commented on code in PR #4912:
URL: https://github.com/apache/iceberg/pull/4912#discussion_r889725878


##########
aws/src/main/java/org/apache/iceberg/aws/AwsProperties.java:
##########
@@ -353,6 +353,30 @@ public class AwsProperties implements Serializable {
   @Deprecated
   public static final boolean CLIENT_ENABLE_ETAG_CHECK_DEFAULT = false;
 
+  /**
+   * Number of times to retry S3 read operation.
+   */
+  public static final String S3_READ_RETRY_NUM_RETRIES = "s3.read.retry.num-retries";
+  public static final int S3_READ_RETRY_NUM_RETRIES_DEFAULT = 6;

Review Comment:
   thanks for the pointer, updated



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] danielcweeks commented on a diff in pull request #4912: AWS: add retry logic to S3InputStream

Posted by GitBox <gi...@apache.org>.
danielcweeks commented on code in PR #4912:
URL: https://github.com/apache/iceberg/pull/4912#discussion_r898542769


##########
aws/src/main/java/org/apache/iceberg/aws/s3/S3InputStream.java:
##########
@@ -172,31 +274,62 @@ private void positionStream() throws IOException {
     }
 
     // close the stream and open at desired position
-    LOG.debug("Seek with new stream for {} to offset {}", location, next);
+    LOG.warn("Seek with new stream for {} to offset {}", location, next);
     pos = next;
     openStream();
   }
 
-  private void openStream() throws IOException {
-    GetObjectRequest.Builder requestBuilder = GetObjectRequest.builder()
-        .bucket(location.bucket())
-        .key(location.key())
-        .range(String.format("bytes=%s-", pos));
-
-    S3RequestUtil.configureEncryption(awsProperties, requestBuilder);
-
+  private void openStream() {
     closeStream();
-    stream = s3.getObject(requestBuilder.build(), ResponseTransformer.toInputStream());
+    stream = readRange(String.format("bytes=%s-", pos));
+  }
+
+  private void closeStream() {
+    closeServerSideStream(stream);
+    stream = null;
   }
 
-  private void closeStream() throws IOException {
-    if (stream != null) {
-      stream.close();
+  private static void closeServerSideStream(InputStream streamToClose) {

Review Comment:
   I'd just put this in the close stream (not have a separate method for it).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] singhpk234 commented on a diff in pull request #4912: AWS: add retry logic to S3InputStream

Posted by GitBox <gi...@apache.org>.
singhpk234 commented on code in PR #4912:
URL: https://github.com/apache/iceberg/pull/4912#discussion_r885877917


##########
aws/src/main/java/org/apache/iceberg/aws/s3/S3InputStream.java:
##########
@@ -178,25 +281,56 @@ private void positionStream() throws IOException {
   }
 
   private void openStream() throws IOException {
-    GetObjectRequest.Builder requestBuilder = GetObjectRequest.builder()
-        .bucket(location.bucket())
-        .key(location.key())
-        .range(String.format("bytes=%s-", pos));
-
-    S3RequestUtil.configureEncryption(awsProperties, requestBuilder);
-
     closeStream();
-    stream = s3.getObject(requestBuilder.build(), ResponseTransformer.toInputStream());
+    stream = readRange(String.format("bytes=%s-", pos));
   }
 
   private void closeStream() throws IOException {
-    if (stream != null) {
-      stream.close();
+    closeServerSideStream(stream);
+    stream = null;
+  }
+
+  private static void closeServerSideStream(InputStream streamToClose) {
+    if (streamToClose != null) {
+      try {
+        if (streamToClose instanceof ResponseInputStream) {
+          // Stated in the ResponseInputStream javadoc:
+          // If it is not desired to read remaining data from the stream,
+          // you can explicitly abort the connection via abort().
+          ((ResponseInputStream<?>) streamToClose).abort();
+        } else {
+          streamToClose.close();
+        }
+      } catch (IOException | AbortedException e) {
+        // ignore failure to abort or close stream
+      }
     }
   }
 
-  public void setSkipSize(int skipSize) {
-    this.skipSize = skipSize;
+  private static boolean shouldRetry(Exception exception) {
+    if (exception instanceof UncheckedIOException) {
+      if (exception.getCause() instanceof EOFException) {
+        return false;
+      }
+    }
+
+    if (exception instanceof AwsServiceException) {
+      switch (((AwsServiceException) exception).statusCode()) {
+        case HttpURLConnection.HTTP_FORBIDDEN:
+        case HttpURLConnection.HTTP_BAD_REQUEST:
+          return false;
+      }
+    }
+
+    if (exception instanceof S3Exception) {
+      switch (((S3Exception) exception).statusCode()) {
+        case HttpURLConnection.HTTP_NOT_FOUND:
+        case 416: // range not satisfied
+          return false;
+      }
+    }
+
+    return true;

Review Comment:
   Should we add this in a seperate util class ? Considering it can be extended to all s3 interactions
   
   Also any pointers if we know this is the complete list considering API's we use to connect to S3, For ex : (sample list : [S3A](https://github.com/apache/hadoop/blob/trunk/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ARetryPolicy.java#L177-L231))



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org