You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by am...@apache.org on 2016/06/08 03:43:04 UTC

svn commit: r1747341 - /jackrabbit/oak/trunk/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/aws/s3/S3Backend.java

Author: amitj
Date: Wed Jun  8 03:43:04 2016
New Revision: 1747341

URL: http://svn.apache.org/viewvc?rev=1747341&view=rev
Log:
OAK-4429: [oak-blob-cloud] S3Backend#getAllIdentifiers should not store all elements in memory

Fixed by returning an iterator directly over the S3 list objects API which already uses pagination to return results in batches of 1000.

Modified:
    jackrabbit/oak/trunk/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/aws/s3/S3Backend.java

Modified: jackrabbit/oak/trunk/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/aws/s3/S3Backend.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/aws/s3/S3Backend.java?rev=1747341&r1=1747340&r2=1747341&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/aws/s3/S3Backend.java (original)
+++ jackrabbit/oak/trunk/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/aws/s3/S3Backend.java Wed Jun  8 03:43:04 2016
@@ -27,6 +27,7 @@ import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Properties;
+import java.util.Queue;
 import java.util.Set;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -34,6 +35,10 @@ import java.util.concurrent.ThreadPoolEx
 import java.util.concurrent.TimeUnit;
 
 import com.amazonaws.services.s3.model.ListObjectsRequest;
+import com.google.common.base.Function;
+import com.google.common.base.Predicate;
+import com.google.common.collect.AbstractIterator;
+import com.google.common.collect.Lists;
 import org.apache.jackrabbit.core.data.AsyncTouchCallback;
 import org.apache.jackrabbit.core.data.AsyncTouchResult;
 import org.apache.jackrabbit.core.data.AsyncUploadCallback;
@@ -66,6 +71,9 @@ import com.amazonaws.services.s3.transfe
 import com.amazonaws.services.s3.transfer.Upload;
 import com.amazonaws.util.StringUtils;
 
+import static com.google.common.collect.Iterables.filter;
+import static java.lang.Thread.currentThread;
+
 /**
  * A data store backend that stores data on Amazon S3.
  */
@@ -411,33 +419,13 @@ public class S3Backend implements Shared
     @Override
     public Iterator<DataIdentifier> getAllIdentifiers()
             throws DataStoreException {
-        long start = System.currentTimeMillis();
-        ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
-        try {
-            Thread.currentThread().setContextClassLoader(
-                getClass().getClassLoader());
-            Set<DataIdentifier> ids = new HashSet<DataIdentifier>();
-            ObjectListing prevObjectListing = s3service.listObjects(bucket);
-            while (true) {
-                for (S3ObjectSummary s3ObjSumm : prevObjectListing.getObjectSummaries()) {
-                    String id = getIdentifierName(s3ObjSumm.getKey());
-                    if (id != null && !id.startsWith(META_KEY_PREFIX)) {
-                        ids.add(new DataIdentifier(id));
-                    }
+        return new RecordsIterator<DataIdentifier>(
+            new Function<S3ObjectSummary, DataIdentifier>() {
+                @Override
+                public DataIdentifier apply(S3ObjectSummary input) {
+                    return new DataIdentifier(getIdentifierName(input.getKey()));
                 }
-                if (!prevObjectListing.isTruncated()) break;
-                prevObjectListing = s3service.listNextBatchOfObjects(prevObjectListing);
-            }
-            LOG.debug("getAllIdentifiers returned size [{}] took [{}] ms.",
-                ids.size(), (System.currentTimeMillis() - start));
-            return ids.iterator();
-        } catch (AmazonServiceException e) {
-            throw new DataStoreException("Could not list objects", e);
-        } finally {
-            if (contextClassLoader != null) {
-                Thread.currentThread().setContextClassLoader(contextClassLoader);
-            }
-        }
+        });
     }
 
     @Override
@@ -705,6 +693,81 @@ public class S3Backend implements Shared
         }
     }
 
+    /**
+     * Returns an iterator over the S3 objects
+     * @param <T>
+     */
+    class RecordsIterator<T> extends AbstractIterator<T> {
+        ObjectListing prevObjectListing;
+        Queue<S3ObjectSummary> queue;
+        long size;
+        Function<S3ObjectSummary, T> transformer;
+
+        public RecordsIterator (Function<S3ObjectSummary, T> transformer) {
+            queue = Lists.newLinkedList();
+            this.transformer = transformer;
+        }
+
+        @Override
+        protected T computeNext() {
+            if (queue.isEmpty()) {
+                loadBatch();
+            }
+
+            if (!queue.isEmpty()) {
+                return transformer.apply(queue.remove());
+            }
+
+            return endOfData();
+        }
+
+        private boolean loadBatch() {
+            ClassLoader contextClassLoader = currentThread().getContextClassLoader();
+            long start = System.currentTimeMillis();
+            try {
+                currentThread().setContextClassLoader(getClass().getClassLoader());
+
+                // initialize the listing the first time
+                if (prevObjectListing == null) {
+                    prevObjectListing = s3service.listObjects(bucket);
+                } else if (prevObjectListing.isTruncated()) { //already initialized more objects available
+                    prevObjectListing = s3service.listNextBatchOfObjects(prevObjectListing);
+                } else { // no more available
+                    return false;
+                }
+
+                List<S3ObjectSummary> listing = Lists.newArrayList(
+                    filter(prevObjectListing.getObjectSummaries(),
+                        new Predicate<S3ObjectSummary>() {
+                            @Override
+                            public boolean apply(S3ObjectSummary input) {
+                                return !input.getKey().startsWith(META_KEY_PREFIX);
+                            }
+                        }));
+
+                // After filtering no elements
+                if (listing.isEmpty()) {
+                    return false;
+                }
+
+                size += listing.size();
+                queue.addAll(listing);
+
+                LOG.info("Loaded batch of size [{}] in [{}] ms.",
+                    listing.size(), (System.currentTimeMillis() - start));
+
+                return true;
+            } catch (AmazonServiceException e) {
+                LOG.warn("Could not list objects", e);
+            } finally {
+                if (contextClassLoader != null) {
+                    currentThread().setContextClassLoader(contextClassLoader);
+                }
+            }
+            return false;
+        }
+    }
+
     private static String addMetaKeyPrefix(String key) {
         return META_KEY_PREFIX + key;
     }