You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tika.apache.org by ta...@apache.org on 2023/03/29 18:43:03 UTC
[tika] branch main updated: improve backoff logic (#1048)
This is an automated email from the ASF dual-hosted git repository.
tallison pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/tika.git
The following commit(s) were added to refs/heads/main by this push:
new 536fdcfbb improve backoff logic (#1048)
536fdcfbb is described below
commit 536fdcfbbf3255d11be9331196dc17a23fd88423
Author: Tim Allison <ta...@apache.org>
AuthorDate: Wed Mar 29 14:42:55 2023 -0400
improve backoff logic (#1048)
---
.../resources/tika-config-s3-integration-test.xml | 1 +
.../apache/tika/pipes/fetcher/s3/S3Fetcher.java | 104 +++++++++++++++++----
2 files changed, 86 insertions(+), 19 deletions(-)
diff --git a/tika-integration-tests/tika-pipes-s3-integration-tests/src/test/resources/tika-config-s3-integration-test.xml b/tika-integration-tests/tika-pipes-s3-integration-tests/src/test/resources/tika-config-s3-integration-test.xml
index 7ddad32d3..8ef6cab29 100644
--- a/tika-integration-tests/tika-pipes-s3-integration-tests/src/test/resources/tika-config-s3-integration-test.xml
+++ b/tika-integration-tests/tika-pipes-s3-integration-tests/src/test/resources/tika-config-s3-integration-test.xml
@@ -87,6 +87,7 @@
<secretKey>{SECRET_KEY}</secretKey>
<endpointConfigurationService>{ENDPOINT_CONFIGURATION_SERVICE}</endpointConfigurationService>
<pathStyleAccessEnabled>true</pathStyleAccessEnabled>
+ <throttleSeconds>30,120,600,1200</throttleSeconds>
</fetcher>
</fetchers>
<pipesIterator class="org.apache.tika.pipes.pipesiterator.s3.S3PipesIterator">
diff --git a/tika-pipes/tika-fetchers/tika-fetcher-s3/src/main/java/org/apache/tika/pipes/fetcher/s3/S3Fetcher.java b/tika-pipes/tika-fetchers/tika-fetcher-s3/src/main/java/org/apache/tika/pipes/fetcher/s3/S3Fetcher.java
index 12774fda0..d91a06aa7 100644
--- a/tika-pipes/tika-fetchers/tika-fetcher-s3/src/main/java/org/apache/tika/pipes/fetcher/s3/S3Fetcher.java
+++ b/tika-pipes/tika-fetchers/tika-fetcher-s3/src/main/java/org/apache/tika/pipes/fetcher/s3/S3Fetcher.java
@@ -24,7 +24,9 @@ import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardCopyOption;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Map;
+import java.util.Set;
import com.amazonaws.AmazonClientException;
import com.amazonaws.ClientConfiguration;
@@ -36,6 +38,7 @@ import com.amazonaws.auth.profile.ProfileCredentialsProvider;
import com.amazonaws.client.builder.AwsClientBuilder;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.AmazonS3ClientBuilder;
+import com.amazonaws.services.s3.model.AmazonS3Exception;
import com.amazonaws.services.s3.model.GetObjectRequest;
import com.amazonaws.services.s3.model.S3Object;
import org.slf4j.Logger;
@@ -65,6 +68,22 @@ public class S3Fetcher extends AbstractFetcher implements Initializable, RangeFe
private static final Logger LOGGER = LoggerFactory.getLogger(S3Fetcher.class);
private static final String PREFIX = "s3";
+
+ //Do not retry if there's an AmazonS3Exception with this error code
+ private static final Set<String> NO_RETRY_ERROR_CODES = new HashSet<>();
+
+ //Keep this private so that we can change as needed.
+ //Not sure if it is better to have an accept list (only throttle on too many requests)
+ //or this deny list...don't throttle for these s3 exceptions
+ static {
+ NO_RETRY_ERROR_CODES.add("AccessDenied");
+ NO_RETRY_ERROR_CODES.add("NoSuchKey");
+ NO_RETRY_ERROR_CODES.add("ExpiredToken");
+ NO_RETRY_ERROR_CODES.add("InvalidAccessKeyId");
+ NO_RETRY_ERROR_CODES.add("InvalidRange");
+ NO_RETRY_ERROR_CODES.add("InvalidRequest");
+
+ }
private final Object[] clientLock = new Object[0];
private String region;
private String bucket;
@@ -80,6 +99,9 @@ public class S3Fetcher extends AbstractFetcher implements Initializable, RangeFe
private boolean spoolToTemp = true;
private int retries = 0;
private long sleepBeforeRetryMillis = 30000;
+
+ private long[] throttleSeconds = null;
+
private long maxLength = -1;
private boolean pathStyleAccessEnabled = false;
@@ -104,35 +126,37 @@ public class S3Fetcher extends AbstractFetcher implements Initializable, RangeFe
}
int tries = 0;
IOException ex = null;
- while (tries <= retries) {
- if (tries > 0) {
- LOGGER.warn("sleeping for {} milliseconds before retry",
- sleepBeforeRetryMillis);
- try {
- Thread.sleep(sleepBeforeRetryMillis);
- } catch (InterruptedException e) {
- throw new RuntimeException("interrupted");
- }
- LOGGER.info("trying to re-initialize S3 client");
- initialize(new HashMap<>());
- }
+ do {
try {
long start = System.currentTimeMillis();
InputStream is = _fetch(theFetchKey, metadata, startRange, endRange);
long elapsed = System.currentTimeMillis() - start;
LOGGER.debug("total to fetch {}", elapsed);
return is;
+ } catch (AmazonS3Exception e) {
+ if (e.getErrorCode() != null && NO_RETRY_ERROR_CODES.contains(e.getErrorCode())) {
+ LOGGER.warn("Hit a no retry error code. Not retrying." + tries, e);
+ throw new IOException(e);
+ }
+ LOGGER.warn("client exception fetching on retry=" + tries, e);
+ ex = new IOException(e);
} catch (AmazonClientException e) {
- //TODO -- filter exceptions -- if the file doesn't exist, don't retry
LOGGER.warn("client exception fetching on retry=" + tries, e);
ex = new IOException(e);
} catch (IOException e) {
- //TODO -- filter exceptions -- if the file doesn't exist, don't retry
LOGGER.warn("client exception fetching on retry=" + tries, e);
ex = e;
}
- tries++;
- }
+ LOGGER.warn("sleeping for {} seconds before retry", throttleSeconds[tries]);
+ try {
+ Thread.sleep(throttleSeconds[tries]);
+ } catch (InterruptedException e) {
+ throw new RuntimeException("interrupted");
+ }
+ LOGGER.debug("trying to re-initialize S3 client");
+ initialize(new HashMap<>());
+ } while (++tries < throttleSeconds.length);
+
throw ex;
}
@@ -171,7 +195,8 @@ public class S3Fetcher extends AbstractFetcher implements Initializable, RangeFe
start = System.currentTimeMillis();
tmp = new TemporaryResources();
Path tmpPath = tmp.createTempFile(FilenameUtils.getSuffixFromPath(fetchKey));
- Files.copy(s3Object.getObjectContent(), tmpPath, StandardCopyOption.REPLACE_EXISTING);
+ Files.copy(s3Object.getObjectContent(), tmpPath,
+ StandardCopyOption.REPLACE_EXISTING);
TikaInputStream tis = TikaInputStream.get(tmpPath, metadata, tmp);
LOGGER.debug("took {} ms to fetch metadata and copy to local tmp file",
System.currentTimeMillis() - start);
@@ -205,6 +230,32 @@ public class S3Fetcher extends AbstractFetcher implements Initializable, RangeFe
this.bucket = bucket;
}
+ /**
+ * Set seconds to throttle retries as a comma-delimited list, e.g.: 30,60,120,600
+ * @param commaDelimitedLongs
+ * @throws TikaConfigException
+ */
+ @Field
+ public void setThrottleSeconds(String commaDelimitedLongs) throws TikaConfigException {
+ String[] longStrings = commaDelimitedLongs.split(",");
+ long[] seconds = new long[longStrings.length];
+ for (int i = 0; i < longStrings.length; i++) {
+ try {
+ seconds[i] = Long.parseLong(longStrings[i]);
+ } catch (NumberFormatException e) {
+ throw new TikaConfigException(e.getMessage());
+ }
+ }
+ setThrottleSeconds(seconds);
+ }
+ public void setThrottleSeconds(long[] throttleSeconds) {
+ this.throttleSeconds = throttleSeconds;
+ }
+
+ public long[] getThrottleSeconds() {
+ return throttleSeconds;
+ }
+
/**
* prefix to prepend to the fetch key before fetching.
* This will automatically add a '/' at the end.
@@ -236,7 +287,11 @@ public class S3Fetcher extends AbstractFetcher implements Initializable, RangeFe
}
@Field
+ /**
+ * @deprecated use {@link #setThrottleSeconds(String)} instead
+ */
public void setRetries(int retries) {
+ LOGGER.info("retries is deprecated. Use setThrottleSeconds instead.");
this.retries = retries;
}
@@ -255,8 +310,13 @@ public class S3Fetcher extends AbstractFetcher implements Initializable, RangeFe
this.maxLength = maxLength;
}
+ /**
+ * @deprecated use {@link #setThrottleSeconds(String)}
+ * @param sleepBeforeRetryMillis -- amount of time in millis to sleep if there was a failure
+ */
@Field
public void setSleepBeforeRetryMillis(long sleepBeforeRetryMillis) {
+ LOGGER.info("sleepBeforeRetryMillis is deprecated. Use setThrottleSeconds instead");
this.sleepBeforeRetryMillis = sleepBeforeRetryMillis;
}
@@ -286,7 +346,8 @@ public class S3Fetcher extends AbstractFetcher implements Initializable, RangeFe
} else if (credentialsProvider.equals("profile")) {
provider = new ProfileCredentialsProvider(profile);
} else if (credentialsProvider.equals("key_secret")) {
- provider = new AWSStaticCredentialsProvider(new BasicAWSCredentials(accessKey, secretKey));
+ provider =
+ new AWSStaticCredentialsProvider(new BasicAWSCredentials(accessKey, secretKey));
} else {
throw new TikaConfigException("credentialsProvider must be set and " +
"must be either 'instance', 'profile' or 'key_secret'");
@@ -307,11 +368,16 @@ public class S3Fetcher extends AbstractFetcher implements Initializable, RangeFe
amazonS3ClientBuilder.withRegion(region);
}
s3Client = amazonS3ClientBuilder.build();
-
}
} catch (AmazonClientException e) {
throw new TikaConfigException("can't initialize s3 fetcher", e);
}
+ if (throttleSeconds == null) {
+ throttleSeconds = new long[retries];
+ for (int i = 0; i < retries; i++) {
+ throttleSeconds[i] = sleepBeforeRetryMillis * 1000;
+ }
+ }
}
@Override