You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by al...@apache.org on 2021/04/07 12:48:04 UTC

[asterixdb] 23/25: [ASTERIXDB-2858][EXT]: Retry upon failure for S3 retryable errors

This is an automated email from the ASF dual-hosted git repository.

alsuliman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git

commit 5b5ac48f8829b63c906b4f28e6a959137be2b163
Author: Hussain Towaileb <Hu...@Couchbase.com>
AuthorDate: Tue Apr 6 15:28:04 2021 +0300

    [ASTERIXDB-2858][EXT]: Retry upon failure for S3 retryable errors
    
    - user model changes: no
    - storage format changes: no
    - interface changes: no
    
    Details:
    - Retry upon failure for S3 retryable errors.
    
    Change-Id: Icec828b119fd959760281c4f4cc49449b779546e
    Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/10743
    Integration-Tests: Jenkins <je...@fulliautomatix.ics.uci.edu>
    Tested-by: Jenkins <je...@fulliautomatix.ics.uci.edu>
    Reviewed-by: Hussain Towaileb <hu...@gmail.com>
    Reviewed-by: Till Westmann
---
 .../abstracts/AbstractExternalInputStream.java     |  3 -
 .../input/record/reader/aws/AwsS3InputStream.java  | 74 +++++++++++++++++++---
 .../record/reader/aws/AwsS3InputStreamFactory.java |  2 +-
 .../external/util/ExternalDataConstants.java       |  9 +++
 .../asterix/external/util/ExternalDataUtils.java   |  3 +-
 .../input/record/reader/awss3/AwsS3Test.java       | 72 +++++++++++++++++++++
 6 files changed, 149 insertions(+), 14 deletions(-)

diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/abstracts/AbstractExternalInputStream.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/abstracts/AbstractExternalInputStream.java
index 898f828..b37bce7 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/abstracts/AbstractExternalInputStream.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/abstracts/AbstractExternalInputStream.java
@@ -52,9 +52,6 @@ public abstract class AbstractExternalInputStream extends AbstractMultipleInputS
 
         // Finished reading all the files
         if (nextFileIndex >= filePaths.size()) {
-            if (in != null) {
-                CleanupUtils.close(in, null);
-            }
             return false;
         }
 
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStream.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStream.java
index 48035f3..4288cc4 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStream.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStream.java
@@ -23,6 +23,7 @@ import static org.apache.asterix.external.util.ExternalDataConstants.AwsS3;
 import java.io.IOException;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.TimeUnit;
 import java.util.zip.GZIPInputStream;
 
 import org.apache.asterix.common.exceptions.CompilationException;
@@ -31,18 +32,22 @@ import org.apache.asterix.common.exceptions.RuntimeDataException;
 import org.apache.asterix.external.input.record.reader.abstracts.AbstractExternalInputStream;
 import org.apache.asterix.external.util.ExternalDataConstants;
 import org.apache.asterix.external.util.ExternalDataUtils;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.util.CleanupUtils;
 import org.apache.hyracks.util.LogRedactionUtil;
 
 import software.amazon.awssdk.core.exception.SdkException;
 import software.amazon.awssdk.services.s3.S3Client;
 import software.amazon.awssdk.services.s3.model.GetObjectRequest;
 import software.amazon.awssdk.services.s3.model.NoSuchKeyException;
+import software.amazon.awssdk.services.s3.model.S3Exception;
 
 public class AwsS3InputStream extends AbstractExternalInputStream {
 
     private final S3Client s3Client;
     private final String bucket;
+    private static final int MAX_RETRIES = 5; // We will retry 5 times in case of internal error from AWS S3 service
 
     public AwsS3InputStream(Map<String, String> configuration, List<String> filePaths) throws HyracksDataException {
         super(configuration, filePaths);
@@ -58,24 +63,75 @@ public class AwsS3InputStream extends AbstractExternalInputStream {
 
         // Have a reference to the S3 stream to ensure that if GZipInputStream causes an IOException because of reading
         // the header, then the S3 stream gets closed in the close method
-        try {
-            in = s3Client.getObject(getObjectRequest);
-        } catch (NoSuchKeyException ex) {
-            LOGGER.debug(() -> "Key " + LogRedactionUtil.userData(getObjectRequest.key()) + " was not found in bucket "
-                    + getObjectRequest.bucket());
+        if (!doGetInputStream(getObjectRequest)) {
             return false;
-        } catch (SdkException ex) {
-            throw new RuntimeDataException(ErrorCode.EXTERNAL_SOURCE_ERROR, ex.getMessage());
         }
 
         // Use gzip stream if needed
-        String lowerCaseFileName = fileName.toLowerCase();
-        if (lowerCaseFileName.endsWith(".gz") || lowerCaseFileName.endsWith(".gzip")) {
+        if (StringUtils.endsWithIgnoreCase(fileName, ".gz") || StringUtils.endsWithIgnoreCase(fileName, ".gzip")) {
             in = new GZIPInputStream(in, ExternalDataConstants.DEFAULT_BUFFER_SIZE);
         }
         return true;
     }
 
+    /**
+     * Get the input stream. If an error is encountered, depending on the error code, a retry might be favorable.
+     *
+     * @return true
+     */
+    private boolean doGetInputStream(GetObjectRequest request) throws RuntimeDataException {
+        int retries = 0;
+        while (retries < MAX_RETRIES) {
+            try {
+                in = s3Client.getObject(request);
+                break;
+            } catch (NoSuchKeyException ex) {
+                LOGGER.debug(() -> "Key " + LogRedactionUtil.userData(request.key()) + " was not found in bucket "
+                        + request.bucket());
+                return false;
+            } catch (S3Exception ex) {
+                if (!shouldRetry(ex.awsErrorDetails().errorCode(), retries++)) {
+                    throw new RuntimeDataException(ErrorCode.EXTERNAL_SOURCE_ERROR, ex.getMessage());
+                }
+                LOGGER.debug(() -> "S3 retryable error: " + LogRedactionUtil.userData(ex.getMessage()));
+
+                // Backoff for 1 sec for the first 2 retries, and 2 seconds from there onward
+                try {
+                    Thread.sleep(TimeUnit.SECONDS.toMillis(retries < 3 ? 1 : 2));
+                } catch (InterruptedException e) {
+                    Thread.currentThread().interrupt();
+                }
+            } catch (SdkException ex) {
+                throw new RuntimeDataException(ErrorCode.EXTERNAL_SOURCE_ERROR, ex.getMessage());
+            }
+        }
+        return true;
+    }
+
+    private boolean shouldRetry(String errorCode, int currentRetry) {
+        return currentRetry < MAX_RETRIES && AwsS3.isRetryableError(errorCode);
+    }
+
+    @Override
+    public void close() throws IOException {
+        if (in != null) {
+            CleanupUtils.close(in, null);
+        }
+        if (s3Client != null) {
+            CleanupUtils.close(s3Client, null);
+        }
+    }
+
+    @Override
+    public boolean stop() {
+        try {
+            close();
+        } catch (IOException e) {
+            // Ignore
+        }
+        return false;
+    }
+
     private S3Client buildAwsS3Client(Map<String, String> configuration) throws HyracksDataException {
         try {
             return ExternalDataUtils.AwsS3.buildAwsS3Client(configuration);
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStreamFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStreamFactory.java
index 747fcca..8197524 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStreamFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStreamFactory.java
@@ -78,7 +78,7 @@ public class AwsS3InputStreamFactory extends AbstractExternalInputStreamFactory
             // New API is not implemented, try falling back to old API
             try {
                 // For error code, see https://docs.aws.amazon.com/AmazonS3/latest/API/ErrorResponses.html
-                if (ex.awsErrorDetails().errorCode().equals("NotImplemented")) {
+                if (ex.awsErrorDetails().errorCode().equals(ExternalDataConstants.AwsS3.ERROR_METHOD_NOT_IMPLEMENTED)) {
                     filesOnly = oldApiListS3Objects(s3Client, container, includeExcludeMatcher);
                 } else {
                     throw ex;
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
index 45d15df..047fe2a 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
@@ -300,6 +300,15 @@ public class ExternalDataConstants {
         public static final String SECRET_ACCESS_KEY_FIELD_NAME = "secretAccessKey";
         public static final String SESSION_TOKEN_FIELD_NAME = "sessionToken";
         public static final String SERVICE_END_POINT_FIELD_NAME = "serviceEndpoint";
+
+        // AWS S3 specific error codes
+        public static final String ERROR_INTERNAL_ERROR = "InternalError";
+        public static final String ERROR_SLOW_DOWN = "SlowDown";
+        public static final String ERROR_METHOD_NOT_IMPLEMENTED = "NotImplemented";
+
+        public static boolean isRetryableError(String errorCode) {
+            return errorCode.equals(ERROR_INTERNAL_ERROR) || errorCode.equals(ERROR_SLOW_DOWN);
+        }
     }
 
     public static class AzureBlob {
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
index d1bbe89..2d25d08 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
@@ -18,6 +18,7 @@
  */
 package org.apache.asterix.external.util;
 
+import static org.apache.asterix.external.util.ExternalDataConstants.AwsS3.ERROR_METHOD_NOT_IMPLEMENTED;
 import static org.apache.asterix.external.util.ExternalDataConstants.AzureBlob.ACCOUNT_KEY_FIELD_NAME;
 import static org.apache.asterix.external.util.ExternalDataConstants.AzureBlob.ACCOUNT_NAME_FIELD_NAME;
 import static org.apache.asterix.external.util.ExternalDataConstants.AzureBlob.BLOB_ENDPOINT_FIELD_NAME;
@@ -785,7 +786,7 @@ public class ExternalDataUtils {
                 // Method not implemented, try falling back to old API
                 try {
                     // For error code, see https://docs.aws.amazon.com/AmazonS3/latest/API/ErrorResponses.html
-                    if (ex.awsErrorDetails().errorCode().equals("NotImplemented")) {
+                    if (ex.awsErrorDetails().errorCode().equals(ERROR_METHOD_NOT_IMPLEMENTED)) {
                         useOldApi = true;
                         response = isBucketEmpty(s3Client, container, prefix, true);
                     } else {
diff --git a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/input/record/reader/awss3/AwsS3Test.java b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/input/record/reader/awss3/AwsS3Test.java
index 9cdeb16..90ea04b 100644
--- a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/input/record/reader/awss3/AwsS3Test.java
+++ b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/input/record/reader/awss3/AwsS3Test.java
@@ -18,16 +18,26 @@
  */
 package org.apache.asterix.external.input.record.reader.awss3;
 
+import static org.apache.asterix.external.util.ExternalDataConstants.AwsS3.ERROR_INTERNAL_ERROR;
+import static org.apache.asterix.external.util.ExternalDataConstants.AwsS3.ERROR_SLOW_DOWN;
+
 import java.lang.reflect.Field;
 import java.lang.reflect.Method;
 import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.asterix.external.input.record.reader.abstracts.AbstractExternalInputStreamFactory;
+import org.apache.asterix.external.input.record.reader.aws.AwsS3InputStream;
 import org.apache.asterix.external.input.record.reader.aws.AwsS3InputStreamFactory;
+import org.apache.hyracks.api.exceptions.IFormattedException;
 import org.junit.Assert;
 import org.junit.Test;
+import org.mockito.Mockito;
 
+import software.amazon.awssdk.awscore.exception.AwsErrorDetails;
+import software.amazon.awssdk.services.s3.S3Client;
+import software.amazon.awssdk.services.s3.model.GetObjectRequest;
+import software.amazon.awssdk.services.s3.model.S3Exception;
 import software.amazon.awssdk.services.s3.model.S3Object;
 
 public class AwsS3Test {
@@ -70,4 +80,66 @@ public class AwsS3Test {
             Assert.assertEquals(workload.getTotalSize(), 600);
         }
     }
+
+    @Test
+    public void s3InternalError() throws Exception {
+        // S3Client mock
+        S3Client s3ClientMock = Mockito.mock(S3Client.class);
+
+        // Prepare S3Exception with internal error code
+        AwsErrorDetails errorDetails = AwsErrorDetails.builder().errorCode(ERROR_INTERNAL_ERROR)
+                .errorMessage("Internal Error from AWS").build();
+        S3Exception internalErrorEx = (S3Exception) S3Exception.builder().awsErrorDetails(errorDetails).build();
+        Mockito.when(s3ClientMock.getObject(GetObjectRequest.builder().build())).thenThrow(internalErrorEx);
+
+        // Set S3Client mock
+        AwsS3InputStream inputStreamMock = Mockito.mock(AwsS3InputStream.class);
+        Field s3ClientField = AwsS3InputStream.class.getDeclaredField("s3Client");
+        s3ClientField.setAccessible(true);
+        s3ClientField.set(inputStreamMock, s3ClientMock);
+
+        // doGetInputStream method
+        Method doGetInputStreamMethod =
+                AwsS3InputStream.class.getDeclaredMethod("doGetInputStream", GetObjectRequest.class);
+        doGetInputStreamMethod.setAccessible(true);
+
+        try {
+            doGetInputStreamMethod.invoke(inputStreamMock, GetObjectRequest.builder().build());
+        } catch (Exception ex) {
+            Assert.assertTrue("Not internal error", ex.getCause() instanceof IFormattedException
+                    && ex.getCause().toString().contains("ASX1108: External source error. Internal Error from AWS"));
+        }
+    }
+
+    @Test
+    public void s3SlowDown() throws Exception {
+        // S3Client mock
+        S3Client s3ClientMock = Mockito.mock(S3Client.class);
+
+        // Prepare S3Exception with slow down error code
+        AwsErrorDetails errorDetails =
+                AwsErrorDetails.builder().errorCode(ERROR_SLOW_DOWN).errorMessage("SlowDown Error from AWS").build();
+        S3Exception slowDownEx = (S3Exception) S3Exception.builder().awsErrorDetails(errorDetails).build();
+        Mockito.when(s3ClientMock.getObject(GetObjectRequest.builder().build())).thenThrow(slowDownEx);
+
+        // Set S3Client mock
+        AwsS3InputStream inputStreamMock = Mockito.mock(AwsS3InputStream.class);
+
+        // Set S3Client
+        Field s3ClientField = AwsS3InputStream.class.getDeclaredField("s3Client");
+        s3ClientField.setAccessible(true);
+        s3ClientField.set(inputStreamMock, s3ClientMock);
+
+        // doGetInputStream method
+        Method doGetInputStreamMethod =
+                AwsS3InputStream.class.getDeclaredMethod("doGetInputStream", GetObjectRequest.class);
+        doGetInputStreamMethod.setAccessible(true);
+
+        try {
+            doGetInputStreamMethod.invoke(inputStreamMock, GetObjectRequest.builder().build());
+        } catch (Exception ex) {
+            Assert.assertTrue("Not SlowDown error", ex.getCause() instanceof IFormattedException
+                    && ex.getCause().toString().contains("ASX1108: External source error. SlowDown Error from AWS"));
+        }
+    }
 }