You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2022/06/01 01:17:48 UTC

[GitHub] [iceberg] amogh-jahagirdar commented on a diff in pull request #4912: AWS: add retry logic to S3InputStream

amogh-jahagirdar commented on code in PR #4912:
URL: https://github.com/apache/iceberg/pull/4912#discussion_r886199393


##########
aws/src/main/java/org/apache/iceberg/aws/AwsProperties.java:
##########
@@ -353,6 +353,30 @@ public class AwsProperties implements Serializable {
   @Deprecated
   public static final boolean CLIENT_ENABLE_ETAG_CHECK_DEFAULT = false;
 
+  /**
+   * Number of times to retry S3 read operation.
+   */
+  public static final String S3_READ_RETRY_NUM_RETRIES = "s3.read.retry.num-retries";
+  public static final int S3_READ_RETRY_NUM_RETRIES_DEFAULT = 6;

Review Comment:
   Just want to confirm how were these default values selected. From Hadoop S3A configurations?



##########
aws/src/main/java/org/apache/iceberg/aws/s3/S3InputStream.java:
##########
@@ -178,25 +281,56 @@ private void positionStream() throws IOException {
   }
 
   private void openStream() throws IOException {
-    GetObjectRequest.Builder requestBuilder = GetObjectRequest.builder()
-        .bucket(location.bucket())
-        .key(location.key())
-        .range(String.format("bytes=%s-", pos));
-
-    S3RequestUtil.configureEncryption(awsProperties, requestBuilder);
-
     closeStream();
-    stream = s3.getObject(requestBuilder.build(), ResponseTransformer.toInputStream());
+    stream = readRange(String.format("bytes=%s-", pos));
   }
 
   private void closeStream() throws IOException {
-    if (stream != null) {
-      stream.close();
+    closeServerSideStream(stream);
+    stream = null;
+  }
+
+  private static void closeServerSideStream(InputStream streamToClose) {
+    if (streamToClose != null) {
+      try {
+        if (streamToClose instanceof ResponseInputStream) {
+          // Stated in the ResponseInputStream javadoc:
+          // If it is not desired to read remaining data from the stream,
+          // you can explicitly abort the connection via abort().
+          ((ResponseInputStream<?>) streamToClose).abort();
+        } else {
+          streamToClose.close();
+        }
+      } catch (IOException | AbortedException e) {
+        // ignore failure to abort or close stream
+      }
     }
   }
 
-  public void setSkipSize(int skipSize) {
-    this.skipSize = skipSize;
+  private static boolean shouldRetry(Exception exception) {
+    if (exception instanceof UncheckedIOException) {
+      if (exception.getCause() instanceof EOFException) {
+        return false;
+      }
+    }
+
+    if (exception instanceof AwsServiceException) {
+      switch (((AwsServiceException) exception).statusCode()) {
+        case HttpURLConnection.HTTP_FORBIDDEN:
+        case HttpURLConnection.HTTP_BAD_REQUEST:
+          return false;
+      }
+    }
+
+    if (exception instanceof S3Exception) {
+      switch (((S3Exception) exception).statusCode()) {
+        case HttpURLConnection.HTTP_NOT_FOUND:
+        case 416: // range not satisfied
+          return false;
+      }
+    }
+
+    return true;

Review Comment:
   Curious is this the same retry policy that Hadoop S3A has? 



##########
aws/src/main/java/org/apache/iceberg/aws/s3/S3InputStream.java:
##########
@@ -88,23 +97,68 @@ public void seek(long newPos) {
 
   @Override
   public int read() throws IOException {
-    Preconditions.checkState(!closed, "Cannot read: already closed");
-    positionStream();
+    AtomicInteger byteRef = new AtomicInteger(0);
+    try {
+      Tasks.foreach(0)

Review Comment:
   Any reason we don't pass in the stream in the foreach? I guess it's a private field so it's a bit awkward to pass it in as an argument but seems more readable than Tasks.foreach(0) imo



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org