You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tika.apache.org by ta...@apache.org on 2023/03/29 18:43:03 UTC

[tika] branch main updated: improve backoff logic (#1048)

This is an automated email from the ASF dual-hosted git repository.

tallison pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/tika.git


The following commit(s) were added to refs/heads/main by this push:
     new 536fdcfbb improve backoff logic (#1048)
536fdcfbb is described below

commit 536fdcfbbf3255d11be9331196dc17a23fd88423
Author: Tim Allison <ta...@apache.org>
AuthorDate: Wed Mar 29 14:42:55 2023 -0400

    improve backoff logic (#1048)
---
 .../resources/tika-config-s3-integration-test.xml  |   1 +
 .../apache/tika/pipes/fetcher/s3/S3Fetcher.java    | 104 +++++++++++++++++----
 2 files changed, 86 insertions(+), 19 deletions(-)

diff --git a/tika-integration-tests/tika-pipes-s3-integration-tests/src/test/resources/tika-config-s3-integration-test.xml b/tika-integration-tests/tika-pipes-s3-integration-tests/src/test/resources/tika-config-s3-integration-test.xml
index 7ddad32d3..8ef6cab29 100644
--- a/tika-integration-tests/tika-pipes-s3-integration-tests/src/test/resources/tika-config-s3-integration-test.xml
+++ b/tika-integration-tests/tika-pipes-s3-integration-tests/src/test/resources/tika-config-s3-integration-test.xml
@@ -87,6 +87,7 @@
       <secretKey>{SECRET_KEY}</secretKey>
       <endpointConfigurationService>{ENDPOINT_CONFIGURATION_SERVICE}</endpointConfigurationService>
       <pathStyleAccessEnabled>true</pathStyleAccessEnabled>
+      <throttleSeconds>30,120,600,1200</throttleSeconds>
     </fetcher>
   </fetchers>
   <pipesIterator class="org.apache.tika.pipes.pipesiterator.s3.S3PipesIterator">
diff --git a/tika-pipes/tika-fetchers/tika-fetcher-s3/src/main/java/org/apache/tika/pipes/fetcher/s3/S3Fetcher.java b/tika-pipes/tika-fetchers/tika-fetcher-s3/src/main/java/org/apache/tika/pipes/fetcher/s3/S3Fetcher.java
index 12774fda0..d91a06aa7 100644
--- a/tika-pipes/tika-fetchers/tika-fetcher-s3/src/main/java/org/apache/tika/pipes/fetcher/s3/S3Fetcher.java
+++ b/tika-pipes/tika-fetchers/tika-fetcher-s3/src/main/java/org/apache/tika/pipes/fetcher/s3/S3Fetcher.java
@@ -24,7 +24,9 @@ import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.StandardCopyOption;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Map;
+import java.util.Set;
 
 import com.amazonaws.AmazonClientException;
 import com.amazonaws.ClientConfiguration;
@@ -36,6 +38,7 @@ import com.amazonaws.auth.profile.ProfileCredentialsProvider;
 import com.amazonaws.client.builder.AwsClientBuilder;
 import com.amazonaws.services.s3.AmazonS3;
 import com.amazonaws.services.s3.AmazonS3ClientBuilder;
+import com.amazonaws.services.s3.model.AmazonS3Exception;
 import com.amazonaws.services.s3.model.GetObjectRequest;
 import com.amazonaws.services.s3.model.S3Object;
 import org.slf4j.Logger;
@@ -65,6 +68,22 @@ public class S3Fetcher extends AbstractFetcher implements Initializable, RangeFe
 
     private static final Logger LOGGER = LoggerFactory.getLogger(S3Fetcher.class);
     private static final String PREFIX = "s3";
+
+    //Do not retry if there's an AmazonS3Exception with this error code
+    private static final Set<String> NO_RETRY_ERROR_CODES = new HashSet<>();
+
+    //Keep this private so that we can change as needed.
+    //Not sure if it is better to have an accept list (only throttle on too many requests)
+    //or this deny list...don't throttle for these s3 exceptions
+    static {
+        NO_RETRY_ERROR_CODES.add("AccessDenied");
+        NO_RETRY_ERROR_CODES.add("NoSuchKey");
+        NO_RETRY_ERROR_CODES.add("ExpiredToken");
+        NO_RETRY_ERROR_CODES.add("InvalidAccessKeyId");
+        NO_RETRY_ERROR_CODES.add("InvalidRange");
+        NO_RETRY_ERROR_CODES.add("InvalidRequest");
+
+    }
     private final Object[] clientLock = new Object[0];
     private String region;
     private String bucket;
@@ -80,6 +99,9 @@ public class S3Fetcher extends AbstractFetcher implements Initializable, RangeFe
     private boolean spoolToTemp = true;
     private int retries = 0;
     private long sleepBeforeRetryMillis = 30000;
+
+    private long[] throttleSeconds = null;
+
     private long maxLength = -1;
     private boolean pathStyleAccessEnabled = false;
 
@@ -104,35 +126,37 @@ public class S3Fetcher extends AbstractFetcher implements Initializable, RangeFe
         }
         int tries = 0;
         IOException ex = null;
-        while (tries <= retries) {
-            if (tries > 0) {
-                LOGGER.warn("sleeping for {} milliseconds before retry",
-                        sleepBeforeRetryMillis);
-                try {
-                    Thread.sleep(sleepBeforeRetryMillis);
-                } catch (InterruptedException e) {
-                    throw new RuntimeException("interrupted");
-                }
-                LOGGER.info("trying to re-initialize S3 client");
-                initialize(new HashMap<>());
-            }
+        do {
             try {
                 long start = System.currentTimeMillis();
                 InputStream is = _fetch(theFetchKey, metadata, startRange, endRange);
                 long elapsed = System.currentTimeMillis() - start;
                 LOGGER.debug("total to fetch {}", elapsed);
                 return is;
+            } catch (AmazonS3Exception e) {
+                if (e.getErrorCode() != null && NO_RETRY_ERROR_CODES.contains(e.getErrorCode())) {
+                    LOGGER.warn("Hit a no retry error code. Not retrying." + tries, e);
+                    throw new IOException(e);
+                }
+                LOGGER.warn("client exception fetching on retry=" + tries, e);
+                ex = new IOException(e);
             } catch (AmazonClientException e) {
-                //TODO -- filter exceptions -- if the file doesn't exist, don't retry
                 LOGGER.warn("client exception fetching on retry=" + tries, e);
                 ex = new IOException(e);
             } catch (IOException e) {
-                //TODO -- filter exceptions -- if the file doesn't exist, don't retry
                 LOGGER.warn("client exception fetching on retry=" + tries, e);
                 ex = e;
             }
-            tries++;
-        }
+            LOGGER.warn("sleeping for {} seconds before retry", throttleSeconds[tries]);
+            try {
+                Thread.sleep(throttleSeconds[tries]);
+            } catch (InterruptedException e) {
+                throw new RuntimeException("interrupted");
+            }
+            LOGGER.debug("trying to re-initialize S3 client");
+            initialize(new HashMap<>());
+        } while (++tries < throttleSeconds.length);
+
         throw ex;
     }
 
@@ -171,7 +195,8 @@ public class S3Fetcher extends AbstractFetcher implements Initializable, RangeFe
                 start = System.currentTimeMillis();
                 tmp = new TemporaryResources();
                 Path tmpPath = tmp.createTempFile(FilenameUtils.getSuffixFromPath(fetchKey));
-                Files.copy(s3Object.getObjectContent(), tmpPath, StandardCopyOption.REPLACE_EXISTING);
+                Files.copy(s3Object.getObjectContent(), tmpPath,
+                        StandardCopyOption.REPLACE_EXISTING);
                 TikaInputStream tis = TikaInputStream.get(tmpPath, metadata, tmp);
                 LOGGER.debug("took {} ms to fetch metadata and copy to local tmp file",
                         System.currentTimeMillis() - start);
@@ -205,6 +230,32 @@ public class S3Fetcher extends AbstractFetcher implements Initializable, RangeFe
         this.bucket = bucket;
     }
 
+    /**
+     * Set seconds to throttle retries as a comma-delimited list, e.g.: 30,60,120,600
+     * @param commaDelimitedLongs
+     * @throws TikaConfigException
+     */
+    @Field
+    public void setThrottleSeconds(String commaDelimitedLongs) throws TikaConfigException {
+        String[] longStrings = commaDelimitedLongs.split(",");
+        long[] seconds = new long[longStrings.length];
+        for (int i = 0; i < longStrings.length; i++) {
+            try {
+                seconds[i] = Long.parseLong(longStrings[i]);
+            } catch (NumberFormatException e) {
+                throw new TikaConfigException(e.getMessage());
+            }
+        }
+        setThrottleSeconds(seconds);
+    }
+    public void setThrottleSeconds(long[] throttleSeconds) {
+        this.throttleSeconds = throttleSeconds;
+    }
+
+    public long[] getThrottleSeconds() {
+        return throttleSeconds;
+    }
+
     /**
      * prefix to prepend to the fetch key before fetching.
      * This will automatically add a '/' at the end.
@@ -236,7 +287,11 @@ public class S3Fetcher extends AbstractFetcher implements Initializable, RangeFe
     }
 
     @Field
+    /**
+     * @deprecated use {@link #setThrottleSeconds(String)} instead
+     */
     public void setRetries(int retries) {
+        LOGGER.info("retries is deprecated. Use setThrottleSeconds instead.");
         this.retries = retries;
     }
 
@@ -255,8 +310,13 @@ public class S3Fetcher extends AbstractFetcher implements Initializable, RangeFe
         this.maxLength = maxLength;
     }
 
+    /**
+     * @deprecated use {@link #setThrottleSeconds(String)}
+     * @param sleepBeforeRetryMillis -- amount of time in millis to sleep if there was a failure
+     */
     @Field
     public void setSleepBeforeRetryMillis(long sleepBeforeRetryMillis) {
+        LOGGER.info("sleepBeforeRetryMillis is deprecated. Use setThrottleSeconds instead");
         this.sleepBeforeRetryMillis = sleepBeforeRetryMillis;
     }
 
@@ -286,7 +346,8 @@ public class S3Fetcher extends AbstractFetcher implements Initializable, RangeFe
         } else if (credentialsProvider.equals("profile")) {
             provider = new ProfileCredentialsProvider(profile);
         } else if (credentialsProvider.equals("key_secret")) {
-            provider = new AWSStaticCredentialsProvider(new BasicAWSCredentials(accessKey, secretKey));
+            provider =
+                    new AWSStaticCredentialsProvider(new BasicAWSCredentials(accessKey, secretKey));
         } else {
             throw new TikaConfigException("credentialsProvider must be set and " +
                     "must be either 'instance', 'profile' or 'key_secret'");
@@ -307,11 +368,16 @@ public class S3Fetcher extends AbstractFetcher implements Initializable, RangeFe
                     amazonS3ClientBuilder.withRegion(region);
                 }
                 s3Client = amazonS3ClientBuilder.build();
-
             }
         } catch (AmazonClientException e) {
             throw new TikaConfigException("can't initialize s3 fetcher", e);
         }
+        if (throttleSeconds == null) {
+            throttleSeconds = new long[retries];
+            for (int i = 0; i < retries; i++) {
+                throttleSeconds[i] = sleepBeforeRetryMillis * 1000;
+            }
+        }
     }
 
     @Override