You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by am...@apache.org on 2016/06/23 05:14:44 UTC

svn commit: r1749818 - in /jackrabbit/oak/branches/1.4: ./ oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/aws/s3/S3Backend.java

Author: amitj
Date: Thu Jun 23 05:14:44 2016
New Revision: 1749818

URL: http://svn.apache.org/viewvc?rev=1749818&view=rev
Log:
OAK-4429: [oak-blob-cloud] S3Backend#getAllIdentifiers should not store all elements in memory

Modified:
    jackrabbit/oak/branches/1.4/   (props changed)
    jackrabbit/oak/branches/1.4/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/aws/s3/S3Backend.java

Propchange: jackrabbit/oak/branches/1.4/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Jun 23 05:14:44 2016
@@ -1,3 +1,3 @@
 /jackrabbit/oak/branches/1.0:1665962
-/jackrabbit/oak/trunk:1733615,1733875,1733913,1733929,1734230,1734254,1734279,1734941,1735052,1735405,1735484,1735549,1735564,1735588,1735622,1735638,1735919,1735983,1736176,1737309-1737310,1737334,1737349,1737998,1738004,1738775,1738795,1738833,1738950,1738957,1738963,1739894,1740116,1740625-1740626,1740971,1741032,1741339,1741343,1742520,1742888,1742916,1743097,1743172,1743343,1744265,1744959,1745038,1745197,1745368,1746086,1746117,1746342,1746345,1746696,1746981,1747492,1748505,1748553,1748870,1749275,1749350,1749464,1749475,1749662
+/jackrabbit/oak/trunk:1733615,1733875,1733913,1733929,1734230,1734254,1734279,1734941,1735052,1735405,1735484,1735549,1735564,1735588,1735622,1735638,1735919,1735983,1736176,1737309-1737310,1737334,1737349,1737998,1738004,1738775,1738795,1738833,1738950,1738957,1738963,1739894,1740116,1740625-1740626,1740971,1741032,1741339,1741343,1742520,1742888,1742916,1743097,1743172,1743343,1744265,1744959,1745038,1745197,1745368,1746086,1746117,1746342,1746345,1746696,1746981,1747341,1747492,1748505,1748553,1748870,1749275,1749350,1749464,1749475,1749662
 /jackrabbit/trunk:1345480

Modified: jackrabbit/oak/branches/1.4/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/aws/s3/S3Backend.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.4/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/aws/s3/S3Backend.java?rev=1749818&r1=1749817&r2=1749818&view=diff
==============================================================================
--- jackrabbit/oak/branches/1.4/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/aws/s3/S3Backend.java (original)
+++ jackrabbit/oak/branches/1.4/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/aws/s3/S3Backend.java Thu Jun 23 05:14:44 2016
@@ -27,6 +27,7 @@ import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Properties;
+import java.util.Queue;
 import java.util.Set;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -34,6 +35,10 @@ import java.util.concurrent.ThreadPoolEx
 import java.util.concurrent.TimeUnit;
 
 import com.amazonaws.services.s3.model.ListObjectsRequest;
+import com.google.common.base.Function;
+import com.google.common.base.Predicate;
+import com.google.common.collect.AbstractIterator;
+import com.google.common.collect.Lists;
 import org.apache.jackrabbit.core.data.AsyncTouchCallback;
 import org.apache.jackrabbit.core.data.AsyncTouchResult;
 import org.apache.jackrabbit.core.data.AsyncUploadCallback;
@@ -66,6 +71,9 @@ import com.amazonaws.services.s3.transfe
 import com.amazonaws.services.s3.transfer.Upload;
 import com.amazonaws.util.StringUtils;
 
+import static com.google.common.collect.Iterables.filter;
+import static java.lang.Thread.currentThread;
+
 /**
  * A data store backend that stores data on Amazon S3.
  */
@@ -409,33 +417,13 @@ public class S3Backend implements Shared
     @Override
     public Iterator<DataIdentifier> getAllIdentifiers()
             throws DataStoreException {
-        long start = System.currentTimeMillis();
-        ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
-        try {
-            Thread.currentThread().setContextClassLoader(
-                getClass().getClassLoader());
-            Set<DataIdentifier> ids = new HashSet<DataIdentifier>();
-            ObjectListing prevObjectListing = s3service.listObjects(bucket);
-            while (true) {
-                for (S3ObjectSummary s3ObjSumm : prevObjectListing.getObjectSummaries()) {
-                    String id = getIdentifierName(s3ObjSumm.getKey());
-                    if (id != null && !id.startsWith(META_KEY_PREFIX)) {
-                        ids.add(new DataIdentifier(id));
-                    }
+        return new RecordsIterator<DataIdentifier>(
+            new Function<S3ObjectSummary, DataIdentifier>() {
+                @Override
+                public DataIdentifier apply(S3ObjectSummary input) {
+                    return new DataIdentifier(getIdentifierName(input.getKey()));
                 }
-                if (!prevObjectListing.isTruncated()) break;
-                prevObjectListing = s3service.listNextBatchOfObjects(prevObjectListing);
-            }
-            LOG.debug("getAllIdentifiers returned size [{}] took [{}] ms.",
-                ids.size(), (System.currentTimeMillis() - start));
-            return ids.iterator();
-        } catch (AmazonServiceException e) {
-            throw new DataStoreException("Could not list objects", e);
-        } finally {
-            if (contextClassLoader != null) {
-                Thread.currentThread().setContextClassLoader(contextClassLoader);
-            }
-        }
+        });
     }
 
     @Override
@@ -703,6 +691,81 @@ public class S3Backend implements Shared
         }
     }
 
+    /**
+     * Returns an iterator over the S3 objects
+     * @param <T>
+     */
+    class RecordsIterator<T> extends AbstractIterator<T> {
+        ObjectListing prevObjectListing;
+        Queue<S3ObjectSummary> queue;
+        long size;
+        Function<S3ObjectSummary, T> transformer;
+
+        public RecordsIterator (Function<S3ObjectSummary, T> transformer) {
+            queue = Lists.newLinkedList();
+            this.transformer = transformer;
+        }
+
+        @Override
+        protected T computeNext() {
+            if (queue.isEmpty()) {
+                loadBatch();
+            }
+
+            if (!queue.isEmpty()) {
+                return transformer.apply(queue.remove());
+            }
+
+            return endOfData();
+        }
+
+        private boolean loadBatch() {
+            ClassLoader contextClassLoader = currentThread().getContextClassLoader();
+            long start = System.currentTimeMillis();
+            try {
+                currentThread().setContextClassLoader(getClass().getClassLoader());
+
+                // initialize the listing the first time
+                if (prevObjectListing == null) {
+                    prevObjectListing = s3service.listObjects(bucket);
+                } else if (prevObjectListing.isTruncated()) { //already initialized more objects available
+                    prevObjectListing = s3service.listNextBatchOfObjects(prevObjectListing);
+                } else { // no more available
+                    return false;
+                }
+
+                List<S3ObjectSummary> listing = Lists.newArrayList(
+                    filter(prevObjectListing.getObjectSummaries(),
+                        new Predicate<S3ObjectSummary>() {
+                            @Override
+                            public boolean apply(S3ObjectSummary input) {
+                                return !input.getKey().startsWith(META_KEY_PREFIX);
+                            }
+                        }));
+
+                // After filtering no elements
+                if (listing.isEmpty()) {
+                    return false;
+                }
+
+                size += listing.size();
+                queue.addAll(listing);
+
+                LOG.info("Loaded batch of size [{}] in [{}] ms.",
+                    listing.size(), (System.currentTimeMillis() - start));
+
+                return true;
+            } catch (AmazonServiceException e) {
+                LOG.warn("Could not list objects", e);
+            } finally {
+                if (contextClassLoader != null) {
+                    currentThread().setContextClassLoader(contextClassLoader);
+                }
+            }
+            return false;
+        }
+    }
+
     private static String addMetaKeyPrefix(String key) {
         return META_KEY_PREFIX + key;
     }