You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tika.apache.org by ta...@apache.org on 2022/08/05 21:22:57 UTC
[tika] branch main updated: TIKA-3831 -- allow for retries in S3Fetcher
This is an automated email from the ASF dual-hosted git repository.
tallison pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/tika.git
The following commit(s) were added to refs/heads/main by this push:
new f50d514a4 TIKA-3831 -- allow for retries in S3Fetcher
f50d514a4 is described below
commit f50d514a47e751fca17c18fa13457f2051123b23
Author: tballison <ta...@apache.org>
AuthorDate: Fri Aug 5 17:22:50 2022 -0400
TIKA-3831 -- allow for retries in S3Fetcher
---
.../apache/tika/pipes/fetcher/s3/S3Fetcher.java | 120 ++++++++++++++-------
1 file changed, 79 insertions(+), 41 deletions(-)
diff --git a/tika-pipes/tika-fetchers/tika-fetcher-s3/src/main/java/org/apache/tika/pipes/fetcher/s3/S3Fetcher.java b/tika-pipes/tika-fetchers/tika-fetcher-s3/src/main/java/org/apache/tika/pipes/fetcher/s3/S3Fetcher.java
index 0e8b19012..777ece96b 100644
--- a/tika-pipes/tika-fetchers/tika-fetcher-s3/src/main/java/org/apache/tika/pipes/fetcher/s3/S3Fetcher.java
+++ b/tika-pipes/tika-fetchers/tika-fetcher-s3/src/main/java/org/apache/tika/pipes/fetcher/s3/S3Fetcher.java
@@ -20,8 +20,11 @@ import static org.apache.tika.config.TikaConfig.mustNotBeEmpty;
import java.io.IOException;
import java.io.InputStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.StandardCopyOption;
+import java.util.HashMap;
import java.util.Map;
-import java.util.regex.Pattern;
import com.amazonaws.AmazonClientException;
import com.amazonaws.ClientConfiguration;
@@ -41,6 +44,7 @@ import org.apache.tika.config.InitializableProblemHandler;
import org.apache.tika.config.Param;
import org.apache.tika.exception.TikaConfigException;
import org.apache.tika.exception.TikaException;
+import org.apache.tika.io.TemporaryResources;
import org.apache.tika.io.TikaInputStream;
import org.apache.tika.metadata.Metadata;
import org.apache.tika.pipes.fetcher.AbstractFetcher;
@@ -48,14 +52,16 @@ import org.apache.tika.pipes.fetcher.RangeFetcher;
import org.apache.tika.utils.StringUtils;
/**
- * Fetches files from s3. Example string: s3://my_bucket/path/to/my_file.pdf
+ * Fetches files from s3. Example file: s3://my_bucket/path/to/my_file.pdf
+ * The bucket must be specified via the tika-config or before
+ * initialization, and the fetch key is "path/to/my_file.pdf".
* This will parse the bucket out of that string and retrieve the path.
*/
public class S3Fetcher extends AbstractFetcher implements Initializable, RangeFetcher {
private static final Logger LOGGER = LoggerFactory.getLogger(S3Fetcher.class);
private static final String PREFIX = "s3";
- private static final Pattern RANGE_PATTERN = Pattern.compile("\\A(.*?):(\\d+):(\\d+)\\Z");
+ private static final Object[] LOCK = new Object[0];
private String region;
private String bucket;
private String profile;
@@ -65,48 +71,68 @@ public class S3Fetcher extends AbstractFetcher implements Initializable, RangeFe
private int maxConnections = ClientConfiguration.DEFAULT_MAX_CONNECTIONS;
private AmazonS3 s3Client;
private boolean spoolToTemp = true;
+ private int retries = 0;
@Override
public InputStream fetch(String fetchKey, Metadata metadata) throws TikaException, IOException {
+ return fetch(fetchKey, -1, -1, metadata);
+ }
+ @Override
+ public InputStream fetch(String fetchKey, long startRange, long endRange, Metadata metadata)
+ throws TikaException, IOException {
String theFetchKey = StringUtils.isBlank(prefix) ? fetchKey : prefix + fetchKey;
- LOGGER.debug("about to fetch fetchkey={} from bucket ({})", theFetchKey, bucket);
- try {
- S3Object s3Object = s3Client.getObject(new GetObjectRequest(bucket, theFetchKey));
- if (extractUserMetadata) {
- for (Map.Entry<String, String> e : s3Object.getObjectMetadata().getUserMetadata()
- .entrySet()) {
- metadata.add(PREFIX + ":" + e.getKey(), e.getValue());
+ if (LOGGER.isDebugEnabled()) {
+ if (startRange > -1) {
+ LOGGER.debug("about to fetch fetchkey={} (start={} end={}) from bucket ({})",
+ theFetchKey, startRange, endRange, bucket);
+ } else {
+ LOGGER.debug("about to fetch fetchkey={} from bucket ({})",
+ theFetchKey, bucket);
+ }
+ }
+ int tries = 0;
+ IOException ex = null;
+ while (tries++ <= retries) {
+ if (tries > 0) {
+ LOGGER.debug("sleeping for 30 seconds before retry");
+ try {
+ Thread.sleep(30000);
+ } catch (InterruptedException e) {
+ throw new RuntimeException("interrupted");
}
+ LOGGER.debug("re-initializing S3 client");
+ initialize(new HashMap<>());
}
- if (!spoolToTemp) {
- return TikaInputStream.get(s3Object.getObjectContent());
- } else {
- long start = System.currentTimeMillis();
- TikaInputStream tis = TikaInputStream.get(s3Object.getObjectContent());
- tis.getPath();
- long elapsed = System.currentTimeMillis() - start;
- LOGGER.debug("took {} ms to copy to local tmp file", elapsed);
- return tis;
+ try {
+ return _fetch(theFetchKey, metadata, startRange, endRange);
+ } catch (AmazonClientException e ) {
+ //TODO -- filter exceptions -- if the file doesn't exist, don't retry
+ LOGGER.warn("client exception fetching on retry=" + tries, e);
+ ex = new IOException(e);
+ } catch (IOException e) {
+ //TODO -- filter exceptions -- if the file doesn't exist, don't retry
+ LOGGER.warn("client exception fetching on retry=" + tries, e);
+ ex = e;
}
- } catch (AmazonClientException e) {
- throw new IOException("s3 client exception", e);
}
+ throw ex;
}
- @Override
- public InputStream fetch(String fetchKey, long startRange, long endRange, Metadata metadata)
- throws TikaException, IOException {
- String theFetchKey = StringUtils.isBlank(prefix) ? fetchKey : prefix + fetchKey;
- //TODO -- figure out how to integrate this
- LOGGER.debug("about to fetch fetchkey={} (start={} end={}) from bucket ({})",
- theFetchKey, startRange, endRange, bucket);
-
+ private InputStream _fetch(String fetchKey, Metadata metadata,
+ Long startRange, Long endRange) throws IOException {
+ TemporaryResources tmp = null;
try {
- S3Object s3Object = s3Client.getObject(
- new GetObjectRequest(bucket, theFetchKey).withRange(startRange, endRange));
-
+ GetObjectRequest objectRequest = new GetObjectRequest(bucket, fetchKey);
+ if (startRange != null && endRange != null
+ && startRange > -1 && endRange > -1) {
+ objectRequest.withRange(startRange, endRange);
+ }
+ S3Object s3Object = null;
+ synchronized (LOCK) {
+ s3Object = s3Client.getObject(objectRequest);
+ }
if (extractUserMetadata) {
for (Map.Entry<String, String> e : s3Object.getObjectMetadata().getUserMetadata()
.entrySet()) {
@@ -117,14 +143,19 @@ public class S3Fetcher extends AbstractFetcher implements Initializable, RangeFe
return TikaInputStream.get(s3Object.getObjectContent());
} else {
long start = System.currentTimeMillis();
- TikaInputStream tis = TikaInputStream.get(s3Object.getObjectContent());
- tis.getPath();
+ tmp = new TemporaryResources();
+ Path tmpPath = tmp.createTempFile();
+ Files.copy(s3Object.getObjectContent(), tmpPath, StandardCopyOption.REPLACE_EXISTING);
+ TikaInputStream tis = TikaInputStream.get(tmpPath, metadata, tmp);
long elapsed = System.currentTimeMillis() - start;
LOGGER.debug("took {} ms to copy to local tmp file", elapsed);
return tis;
}
- } catch (AmazonClientException e) {
- throw new IOException(e);
+ } catch (Throwable e) {
+ if (tmp != null) {
+ tmp.close();
+ }
+ throw e;
}
}
@@ -176,11 +207,16 @@ public class S3Fetcher extends AbstractFetcher implements Initializable, RangeFe
this.maxConnections = maxConnections;
}
+ @Field
+ public void setRetries(int retries) {
+ this.retries = retries;
+ }
+
@Field
public void setCredentialsProvider(String credentialsProvider) {
if (!credentialsProvider.equals("profile") && !credentialsProvider.equals("instance")) {
throw new IllegalArgumentException(
- "credentialsProvider must be either 'profile' or instance'");
+ "credentialsProvider must be either 'profile' or 'instance'");
}
this.credentialsProvider = credentialsProvider;
}
@@ -207,10 +243,12 @@ public class S3Fetcher extends AbstractFetcher implements Initializable, RangeFe
ClientConfiguration clientConfiguration = new ClientConfiguration()
.withMaxConnections(maxConnections);
try {
- s3Client = AmazonS3ClientBuilder.standard()
- .withClientConfiguration(clientConfiguration)
- .withRegion(region).withCredentials(provider)
- .build();
+ synchronized (LOCK) {
+ s3Client = AmazonS3ClientBuilder.standard()
+ .withClientConfiguration(clientConfiguration)
+ .withRegion(region)
+ .withCredentials(provider).build();
+ }
} catch (AmazonClientException e) {
throw new TikaConfigException("can't initialize s3 fetcher", e);
}