You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by am...@apache.org on 2016/06/23 05:14:44 UTC
svn commit: r1749818 - in /jackrabbit/oak/branches/1.4: ./
oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/aws/s3/S3Backend.java
Author: amitj
Date: Thu Jun 23 05:14:44 2016
New Revision: 1749818
URL: http://svn.apache.org/viewvc?rev=1749818&view=rev
Log:
OAK-4429: [oak-blob-cloud] S3Backend#getAllIdentifiers should not store all elements in memory
Modified:
jackrabbit/oak/branches/1.4/ (props changed)
jackrabbit/oak/branches/1.4/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/aws/s3/S3Backend.java
Propchange: jackrabbit/oak/branches/1.4/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Jun 23 05:14:44 2016
@@ -1,3 +1,3 @@
/jackrabbit/oak/branches/1.0:1665962
-/jackrabbit/oak/trunk:1733615,1733875,1733913,1733929,1734230,1734254,1734279,1734941,1735052,1735405,1735484,1735549,1735564,1735588,1735622,1735638,1735919,1735983,1736176,1737309-1737310,1737334,1737349,1737998,1738004,1738775,1738795,1738833,1738950,1738957,1738963,1739894,1740116,1740625-1740626,1740971,1741032,1741339,1741343,1742520,1742888,1742916,1743097,1743172,1743343,1744265,1744959,1745038,1745197,1745368,1746086,1746117,1746342,1746345,1746696,1746981,1747492,1748505,1748553,1748870,1749275,1749350,1749464,1749475,1749662
+/jackrabbit/oak/trunk:1733615,1733875,1733913,1733929,1734230,1734254,1734279,1734941,1735052,1735405,1735484,1735549,1735564,1735588,1735622,1735638,1735919,1735983,1736176,1737309-1737310,1737334,1737349,1737998,1738004,1738775,1738795,1738833,1738950,1738957,1738963,1739894,1740116,1740625-1740626,1740971,1741032,1741339,1741343,1742520,1742888,1742916,1743097,1743172,1743343,1744265,1744959,1745038,1745197,1745368,1746086,1746117,1746342,1746345,1746696,1746981,1747341,1747492,1748505,1748553,1748870,1749275,1749350,1749464,1749475,1749662
/jackrabbit/trunk:1345480
Modified: jackrabbit/oak/branches/1.4/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/aws/s3/S3Backend.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.4/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/aws/s3/S3Backend.java?rev=1749818&r1=1749817&r2=1749818&view=diff
==============================================================================
--- jackrabbit/oak/branches/1.4/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/aws/s3/S3Backend.java (original)
+++ jackrabbit/oak/branches/1.4/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/aws/s3/S3Backend.java Thu Jun 23 05:14:44 2016
@@ -27,6 +27,7 @@ import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
+import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -34,6 +35,10 @@ import java.util.concurrent.ThreadPoolEx
import java.util.concurrent.TimeUnit;
import com.amazonaws.services.s3.model.ListObjectsRequest;
+import com.google.common.base.Function;
+import com.google.common.base.Predicate;
+import com.google.common.collect.AbstractIterator;
+import com.google.common.collect.Lists;
import org.apache.jackrabbit.core.data.AsyncTouchCallback;
import org.apache.jackrabbit.core.data.AsyncTouchResult;
import org.apache.jackrabbit.core.data.AsyncUploadCallback;
@@ -66,6 +71,9 @@ import com.amazonaws.services.s3.transfe
import com.amazonaws.services.s3.transfer.Upload;
import com.amazonaws.util.StringUtils;
+import static com.google.common.collect.Iterables.filter;
+import static java.lang.Thread.currentThread;
+
/**
* A data store backend that stores data on Amazon S3.
*/
@@ -409,33 +417,13 @@ public class S3Backend implements Shared
@Override
public Iterator<DataIdentifier> getAllIdentifiers()
throws DataStoreException {
- long start = System.currentTimeMillis();
- ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
- try {
- Thread.currentThread().setContextClassLoader(
- getClass().getClassLoader());
- Set<DataIdentifier> ids = new HashSet<DataIdentifier>();
- ObjectListing prevObjectListing = s3service.listObjects(bucket);
- while (true) {
- for (S3ObjectSummary s3ObjSumm : prevObjectListing.getObjectSummaries()) {
- String id = getIdentifierName(s3ObjSumm.getKey());
- if (id != null && !id.startsWith(META_KEY_PREFIX)) {
- ids.add(new DataIdentifier(id));
- }
+ return new RecordsIterator<DataIdentifier>(
+ new Function<S3ObjectSummary, DataIdentifier>() {
+ @Override
+ public DataIdentifier apply(S3ObjectSummary input) {
+ return new DataIdentifier(getIdentifierName(input.getKey()));
}
- if (!prevObjectListing.isTruncated()) break;
- prevObjectListing = s3service.listNextBatchOfObjects(prevObjectListing);
- }
- LOG.debug("getAllIdentifiers returned size [{}] took [{}] ms.",
- ids.size(), (System.currentTimeMillis() - start));
- return ids.iterator();
- } catch (AmazonServiceException e) {
- throw new DataStoreException("Could not list objects", e);
- } finally {
- if (contextClassLoader != null) {
- Thread.currentThread().setContextClassLoader(contextClassLoader);
- }
- }
+ });
}
@Override
@@ -703,6 +691,81 @@ public class S3Backend implements Shared
}
}
+ /**
+ * Returns an iterator over the S3 objects
+ * @param <T>
+ */
+ class RecordsIterator<T> extends AbstractIterator<T> {
+ ObjectListing prevObjectListing;
+ Queue<S3ObjectSummary> queue;
+ long size;
+ Function<S3ObjectSummary, T> transformer;
+
+ public RecordsIterator (Function<S3ObjectSummary, T> transformer) {
+ queue = Lists.newLinkedList();
+ this.transformer = transformer;
+ }
+
+ @Override
+ protected T computeNext() {
+ if (queue.isEmpty()) {
+ loadBatch();
+ }
+
+ if (!queue.isEmpty()) {
+ return transformer.apply(queue.remove());
+ }
+
+ return endOfData();
+ }
+
+ private boolean loadBatch() {
+ ClassLoader contextClassLoader = currentThread().getContextClassLoader();
+ long start = System.currentTimeMillis();
+ try {
+ currentThread().setContextClassLoader(getClass().getClassLoader());
+
+ // initialize the listing the first time
+ if (prevObjectListing == null) {
+ prevObjectListing = s3service.listObjects(bucket);
+ } else if (prevObjectListing.isTruncated()) { //already initialized more objects available
+ prevObjectListing = s3service.listNextBatchOfObjects(prevObjectListing);
+ } else { // no more available
+ return false;
+ }
+
+ List<S3ObjectSummary> listing = Lists.newArrayList(
+ filter(prevObjectListing.getObjectSummaries(),
+ new Predicate<S3ObjectSummary>() {
+ @Override
+ public boolean apply(S3ObjectSummary input) {
+ return !input.getKey().startsWith(META_KEY_PREFIX);
+ }
+ }));
+
+ // After filtering no elements
+ if (listing.isEmpty()) {
+ return false;
+ }
+
+ size += listing.size();
+ queue.addAll(listing);
+
+ LOG.info("Loaded batch of size [{}] in [{}] ms.",
+ listing.size(), (System.currentTimeMillis() - start));
+
+ return true;
+ } catch (AmazonServiceException e) {
+ LOG.warn("Could not list objects", e);
+ } finally {
+ if (contextClassLoader != null) {
+ currentThread().setContextClassLoader(contextClassLoader);
+ }
+ }
+ return false;
+ }
+ }
+
private static String addMetaKeyPrefix(String key) {
return META_KEY_PREFIX + key;
}