You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tika.apache.org by ta...@apache.org on 2022/08/05 21:22:57 UTC

[tika] branch main updated: TIKA-3831 -- allow for retries in S3Fetcher

This is an automated email from the ASF dual-hosted git repository.

tallison pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/tika.git


The following commit(s) were added to refs/heads/main by this push:
     new f50d514a4 TIKA-3831 -- allow for retries in S3Fetcher
f50d514a4 is described below

commit f50d514a47e751fca17c18fa13457f2051123b23
Author: tballison <ta...@apache.org>
AuthorDate: Fri Aug 5 17:22:50 2022 -0400

    TIKA-3831 -- allow for retries in S3Fetcher
---
 .../apache/tika/pipes/fetcher/s3/S3Fetcher.java    | 120 ++++++++++++++-------
 1 file changed, 79 insertions(+), 41 deletions(-)

diff --git a/tika-pipes/tika-fetchers/tika-fetcher-s3/src/main/java/org/apache/tika/pipes/fetcher/s3/S3Fetcher.java b/tika-pipes/tika-fetchers/tika-fetcher-s3/src/main/java/org/apache/tika/pipes/fetcher/s3/S3Fetcher.java
index 0e8b19012..777ece96b 100644
--- a/tika-pipes/tika-fetchers/tika-fetcher-s3/src/main/java/org/apache/tika/pipes/fetcher/s3/S3Fetcher.java
+++ b/tika-pipes/tika-fetchers/tika-fetcher-s3/src/main/java/org/apache/tika/pipes/fetcher/s3/S3Fetcher.java
@@ -20,8 +20,11 @@ import static org.apache.tika.config.TikaConfig.mustNotBeEmpty;
 
 import java.io.IOException;
 import java.io.InputStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.StandardCopyOption;
+import java.util.HashMap;
 import java.util.Map;
-import java.util.regex.Pattern;
 
 import com.amazonaws.AmazonClientException;
 import com.amazonaws.ClientConfiguration;
@@ -41,6 +44,7 @@ import org.apache.tika.config.InitializableProblemHandler;
 import org.apache.tika.config.Param;
 import org.apache.tika.exception.TikaConfigException;
 import org.apache.tika.exception.TikaException;
+import org.apache.tika.io.TemporaryResources;
 import org.apache.tika.io.TikaInputStream;
 import org.apache.tika.metadata.Metadata;
 import org.apache.tika.pipes.fetcher.AbstractFetcher;
@@ -48,14 +52,16 @@ import org.apache.tika.pipes.fetcher.RangeFetcher;
 import org.apache.tika.utils.StringUtils;
 
 /**
- * Fetches files from s3. Example string: s3://my_bucket/path/to/my_file.pdf
+ * Fetches files from s3. Example file: s3://my_bucket/path/to/my_file.pdf
+ * The bucket must be specified via the tika-config or before
+ * initialization, and the fetch key is "path/to/my_file.pdf".
  * This will parse the bucket out of that string and retrieve the path.
  */
 public class S3Fetcher extends AbstractFetcher implements Initializable, RangeFetcher {
 
     private static final Logger LOGGER = LoggerFactory.getLogger(S3Fetcher.class);
     private static final String PREFIX = "s3";
-    private static final Pattern RANGE_PATTERN = Pattern.compile("\\A(.*?):(\\d+):(\\d+)\\Z");
+    private static final Object[] LOCK = new Object[0];
     private String region;
     private String bucket;
     private String profile;
@@ -65,48 +71,68 @@ public class S3Fetcher extends AbstractFetcher implements Initializable, RangeFe
     private int maxConnections = ClientConfiguration.DEFAULT_MAX_CONNECTIONS;
     private AmazonS3 s3Client;
     private boolean spoolToTemp = true;
+    private int retries = 0;
 
     @Override
     public InputStream fetch(String fetchKey, Metadata metadata) throws TikaException, IOException {
+        return fetch(fetchKey, -1, -1, metadata);
+    }
 
+    @Override
+    public InputStream fetch(String fetchKey, long startRange, long endRange, Metadata metadata)
+            throws TikaException, IOException {
         String theFetchKey = StringUtils.isBlank(prefix) ? fetchKey : prefix + fetchKey;
-        LOGGER.debug("about to fetch fetchkey={} from bucket ({})", theFetchKey, bucket);
 
-        try {
-            S3Object s3Object = s3Client.getObject(new GetObjectRequest(bucket, theFetchKey));
-            if (extractUserMetadata) {
-                for (Map.Entry<String, String> e : s3Object.getObjectMetadata().getUserMetadata()
-                        .entrySet()) {
-                    metadata.add(PREFIX + ":" + e.getKey(), e.getValue());
+        if (LOGGER.isDebugEnabled()) {
+            if (startRange > -1) {
+                LOGGER.debug("about to fetch fetchkey={} (start={} end={}) from bucket ({})",
+                        theFetchKey, startRange, endRange, bucket);
+            } else {
+                LOGGER.debug("about to fetch fetchkey={} from bucket ({})",
+                        theFetchKey, bucket);
+            }
+        }
+        int tries = 0;
+        IOException ex = null;
+        while (tries++ <= retries) {
+            if (tries > 0) {
+                LOGGER.debug("sleeping for 30 seconds before retry");
+                try {
+                    Thread.sleep(30000);
+                } catch (InterruptedException e) {
+                    throw new RuntimeException("interrupted");
                 }
+                LOGGER.debug("re-initializing S3 client");
+                initialize(new HashMap<>());
             }
-            if (!spoolToTemp) {
-                return TikaInputStream.get(s3Object.getObjectContent());
-            } else {
-                long start = System.currentTimeMillis();
-                TikaInputStream tis = TikaInputStream.get(s3Object.getObjectContent());
-                tis.getPath();
-                long elapsed = System.currentTimeMillis() - start;
-                LOGGER.debug("took {} ms to copy to local tmp file", elapsed);
-                return tis;
+            try {
+                return _fetch(theFetchKey, metadata, startRange, endRange);
+            } catch (AmazonClientException e ) {
+                //TODO -- filter exceptions -- if the file doesn't exist, don't retry
+                LOGGER.warn("client exception fetching on retry=" + tries, e);
+                ex = new IOException(e);
+            } catch (IOException e) {
+                //TODO -- filter exceptions -- if the file doesn't exist, don't retry
+                LOGGER.warn("client exception fetching on retry=" + tries, e);
+                ex = e;
             }
-        } catch (AmazonClientException e) {
-            throw new IOException("s3 client exception", e);
         }
+        throw ex;
     }
 
-    @Override
-    public InputStream fetch(String fetchKey, long startRange, long endRange, Metadata metadata)
-            throws TikaException, IOException {
-        String theFetchKey = StringUtils.isBlank(prefix) ? fetchKey : prefix + fetchKey;
-        //TODO -- figure out how to integrate this
-        LOGGER.debug("about to fetch fetchkey={} (start={} end={}) from bucket ({})",
-                theFetchKey, startRange, endRange, bucket);
-
+    private InputStream _fetch(String fetchKey, Metadata metadata,
+                               Long startRange, Long endRange) throws IOException {
+        TemporaryResources tmp = null;
         try {
-            S3Object s3Object = s3Client.getObject(
-                    new GetObjectRequest(bucket, theFetchKey).withRange(startRange, endRange));
-
+            GetObjectRequest objectRequest = new GetObjectRequest(bucket, fetchKey);
+            if (startRange != null && endRange != null
+                    && startRange > -1 && endRange > -1) {
+                objectRequest.withRange(startRange, endRange);
+            }
+            S3Object s3Object = null;
+            synchronized (LOCK) {
+                s3Object = s3Client.getObject(objectRequest);
+            }
             if (extractUserMetadata) {
                 for (Map.Entry<String, String> e : s3Object.getObjectMetadata().getUserMetadata()
                         .entrySet()) {
@@ -117,14 +143,19 @@ public class S3Fetcher extends AbstractFetcher implements Initializable, RangeFe
                 return TikaInputStream.get(s3Object.getObjectContent());
             } else {
                 long start = System.currentTimeMillis();
-                TikaInputStream tis = TikaInputStream.get(s3Object.getObjectContent());
-                tis.getPath();
+                tmp = new TemporaryResources();
+                Path tmpPath = tmp.createTempFile();
+                Files.copy(s3Object.getObjectContent(), tmpPath, StandardCopyOption.REPLACE_EXISTING);
+                TikaInputStream tis = TikaInputStream.get(tmpPath, metadata, tmp);
                 long elapsed = System.currentTimeMillis() - start;
                 LOGGER.debug("took {} ms to copy to local tmp file", elapsed);
                 return tis;
             }
-        } catch (AmazonClientException e) {
-            throw new IOException(e);
+        } catch (Throwable e) {
+            if (tmp != null) {
+                tmp.close();
+            }
+            throw e;
         }
     }
 
@@ -176,11 +207,16 @@ public class S3Fetcher extends AbstractFetcher implements Initializable, RangeFe
         this.maxConnections = maxConnections;
     }
 
+    @Field
+    public void setRetries(int retries) {
+        this.retries = retries;
+    }
+
     @Field
     public void setCredentialsProvider(String credentialsProvider) {
         if (!credentialsProvider.equals("profile") && !credentialsProvider.equals("instance")) {
             throw new IllegalArgumentException(
-                    "credentialsProvider must be either 'profile' or instance'");
+                    "credentialsProvider must be either 'profile' or 'instance'");
         }
         this.credentialsProvider = credentialsProvider;
     }
@@ -207,10 +243,12 @@ public class S3Fetcher extends AbstractFetcher implements Initializable, RangeFe
         ClientConfiguration clientConfiguration = new ClientConfiguration()
                         .withMaxConnections(maxConnections);
         try {
-            s3Client = AmazonS3ClientBuilder.standard()
-                    .withClientConfiguration(clientConfiguration)
-                    .withRegion(region).withCredentials(provider)
-                    .build();
+            synchronized (LOCK) {
+                s3Client = AmazonS3ClientBuilder.standard()
+                        .withClientConfiguration(clientConfiguration)
+                        .withRegion(region)
+                        .withCredentials(provider).build();
+            }
         } catch (AmazonClientException e) {
             throw new TikaConfigException("can't initialize s3 fetcher", e);
         }