You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by re...@apache.org on 2017/05/08 15:25:35 UTC

svn commit: r1794393 - in /jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document: BlobReferenceIterator.java DocumentBlobReferenceRetriever.java util/Utils.java

Author: reschke
Date: Mon May  8 15:25:35 2017
New Revision: 1794393

URL: http://svn.apache.org/viewvc?rev=1794393&view=rev
Log:
OAK-6162: BlobReferenceIterator refactoring

Modified:
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/BlobReferenceIterator.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentBlobReferenceRetriever.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/Utils.java

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/BlobReferenceIterator.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/BlobReferenceIterator.java?rev=1794393&r1=1794392&r2=1794393&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/BlobReferenceIterator.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/BlobReferenceIterator.java Mon May  8 15:25:35 2017
@@ -16,96 +16,76 @@
  */
 package org.apache.jackrabbit.oak.plugins.document;
 
-import java.util.HashSet;
+import java.io.Closeable;
 import java.util.Iterator;
-import java.util.List;
-import java.util.NoSuchElementException;
+import java.util.Queue;
 
 import org.apache.jackrabbit.oak.plugins.blob.ReferencedBlob;
+import org.apache.jackrabbit.oak.plugins.document.util.Utils;
+
+import com.google.common.collect.AbstractIterator;
+import com.google.common.collect.Queues;
 
 /**
  * An iterator over all referenced binaries.
  * <p>
- * Only top-level referenced are returned (indirection, if any, is not resolved).
- * The items are returned in no particular order.
- * An item might be returned multiple times.
+ * Only top-level referenced are returned (indirection, if any, is not
+ * resolved). The items are returned in no particular order. An item might be
+ * returned multiple times.
  */
-public class BlobReferenceIterator implements Iterator<ReferencedBlob> {
+public class BlobReferenceIterator extends AbstractIterator<ReferencedBlob> implements Closeable {
 
-    private static final int BATCH_SIZE = 1000;
-    private final DocumentStore docStore;
+    private final DocumentStore documentStore;
     private final BlobCollector blobCollector;
-    private HashSet<ReferencedBlob> batch = new HashSet<ReferencedBlob>();
-    private Iterator<ReferencedBlob> batchIterator;
-    private boolean done;
-    private String fromKey = NodeDocument.MIN_ID_VALUE;
+    private final Queue<ReferencedBlob> blobs = Queues.newArrayDeque();
+
+    private Iterator<NodeDocument> iterator;
 
     public BlobReferenceIterator(DocumentNodeStore nodeStore) {
-        this.docStore = nodeStore.getDocumentStore();
-        batchIterator = batch.iterator();
+        this.documentStore = nodeStore.getDocumentStore();
         this.blobCollector = new BlobCollector(nodeStore);
     }
 
     @Override
-    public boolean hasNext() {
-        if (!batchIterator.hasNext()) {
+    protected ReferencedBlob computeNext() {
+        if (blobs.isEmpty()) {
             loadBatch();
         }
-        return batchIterator.hasNext() || !done;
-    }
 
-    @Override
-    public ReferencedBlob next() {
-        // this will load the next batch if required
-        if (!hasNext()) {
-            throw new NoSuchElementException();
+        if (!blobs.isEmpty()) {
+            return blobs.remove();
+        } else {
+            return endOfData();
         }
-        return batchIterator.next();
-    }
-
-    @Override
-    public void remove() {
-        throw new UnsupportedOperationException();
     }
 
     private void loadBatch() {
-        if (done) {
-            return;
+        if (this.iterator == null) {
+            this.iterator = getIteratorOverDocsWithBinaries();
         }
-        batch.clear();
-        // read until at least BATCH_SIZE references are available
-        while (true) {
-            boolean hasMore = loadBatchQuery();
-            if (!hasMore) {
-                done = true;
-                break;
-            }
-            if (batch.size() > BATCH_SIZE) {
-                break;
-            }
+        // Some node which have the '_bin' flag set might not have any binaries
+        // in it so move forward if blobs is still empty and cursor has more
+        // elements
+        while (iterator.hasNext() && blobs.isEmpty()) {
+            collectBinaries(iterator.next());
         }
-        batchIterator = batch.iterator();
     }
 
-    private boolean loadBatchQuery() {
-        // read about BATCH_SIZE documents
-        List<NodeDocument> list =
-                docStore.query(Collection.NODES, fromKey, NodeDocument.MAX_ID_VALUE, NodeDocument.HAS_BINARY_FLAG,
-                        NodeDocument.HAS_BINARY_VAL,
-                        BATCH_SIZE);
-        boolean hasMore = false;
-        for (NodeDocument doc : list) {
-            if (doc.getId().equals(fromKey)) {
-                // already read
-                continue;
-            }
-            hasMore = true;
-            fromKey = doc.getId();
-            blobCollector.collect(doc, batch);
-        }
-        return hasMore;
+    private void collectBinaries(NodeDocument nodeDocument) {
+        blobCollector.collect(nodeDocument, blobs);
     }
 
+    /**
+     * Override this document to use a document store specific iterator.
+     */
+    public Iterator<NodeDocument> getIteratorOverDocsWithBinaries() {
+        int batchSize = 1000;
+        return Utils.getSelectedDocuments(documentStore, NodeDocument.HAS_BINARY_FLAG, NodeDocument.HAS_BINARY_VAL, batchSize)
+                .iterator();
+    }
 
-
+    @Override
+    public void close() {
+        Utils.closeIfCloseable(iterator);
+    }
 }

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentBlobReferenceRetriever.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentBlobReferenceRetriever.java?rev=1794393&r1=1794392&r2=1794393&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentBlobReferenceRetriever.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentBlobReferenceRetriever.java Mon May  8 15:25:35 2017
@@ -18,15 +18,14 @@
  */
 package org.apache.jackrabbit.oak.plugins.document;
 
-import java.io.Closeable;
 import java.util.Iterator;
 
 import org.apache.jackrabbit.oak.api.Blob;
-import org.apache.jackrabbit.oak.commons.IOUtils;
 import org.apache.jackrabbit.oak.plugins.blob.BlobReferenceRetriever;
 import org.apache.jackrabbit.oak.plugins.blob.BlobStoreBlob;
 import org.apache.jackrabbit.oak.plugins.blob.ReferenceCollector;
 import org.apache.jackrabbit.oak.plugins.blob.ReferencedBlob;
+import org.apache.jackrabbit.oak.plugins.document.util.Utils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -44,30 +43,29 @@ public class DocumentBlobReferenceRetrie
     @Override
     public void collectReferences(ReferenceCollector collector) {
         int referencesFound = 0;
-        Iterator<ReferencedBlob> blobIterator = nodeStore.getReferencedBlobsIterator();
+        Iterator<ReferencedBlob> blobIterator = null;
         try {
+            blobIterator = nodeStore.getReferencedBlobsIterator();
             while (blobIterator.hasNext()) {
                 ReferencedBlob refBlob = blobIterator.next();
                 Blob blob = refBlob.getBlob();
                 referencesFound++;
 
-                //TODO this mode would also add in memory blobId
-                //Would that be an issue
+                // TODO this mode would also add in memory blobId
+                // Would that be an issue
 
                 if (blob instanceof BlobStoreBlob) {
                     collector.addReference(((BlobStoreBlob) blob).getBlobId(), refBlob.getId());
                 } else {
-                    //TODO Should not rely on toString. Instead obtain
-                    //secure reference and convert that to blobId using
-                    //blobStore
+                    // TODO Should not rely on toString. Instead obtain
+                    // secure reference and convert that to blobId using
+                    // blobStore
 
                     collector.addReference(blob.toString(), refBlob.getId());
                 }
             }
-        }finally{
-            if(blobIterator instanceof Closeable){
-                IOUtils.closeQuietly((Closeable) blobIterator);
-            }
+        } finally {
+            Utils.closeIfCloseable(blobIterator);
         }
         log.debug("Total blob references found (including chunk resolution) [{}]", referencesFound);
     }

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/Utils.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/Utils.java?rev=1794393&r1=1794392&r2=1794393&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/Utils.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/Utils.java Mon May  8 15:25:35 2017
@@ -569,6 +569,9 @@ public class Utils {
         return c.compare(a, b) <= 0 ? a : b;
     }
 
+    // default batch size for paging through a document store
+    private static final int DEFAULT_BATCH_SIZE = 100;
+
     /**
      * Returns an {@link Iterable} over all {@link NodeDocument}s in the given
      * store. The returned {@linkplain Iterable} does not guarantee a consistent
@@ -580,7 +583,7 @@ public class Utils {
      * @return an {@link Iterable} over all documents in the store.
      */
     public static Iterable<NodeDocument> getAllDocuments(final DocumentStore store) {
-        return internalGetSelectedDocuments(store, null, 0);
+        return internalGetSelectedDocuments(store, null, 0, DEFAULT_BATCH_SIZE);
     }
 
     /**
@@ -616,23 +619,35 @@ public class Utils {
      * @param indexedProperty the name of the indexed property.
      * @param startValue the lower bound value for the indexed property
      *                   (inclusive).
+     * @param batchSize number of documents to fetch at once
      * @return an {@link Iterable} over all documents in the store matching the
      *         condition
      */
     public static Iterable<NodeDocument> getSelectedDocuments(
+            DocumentStore store, String indexedProperty, long startValue, int batchSize) {
+        return internalGetSelectedDocuments(store, indexedProperty, startValue, batchSize);
+    }
+
+    /**
+     * Like {@link #getSelectedDocuments(DocumentStore, String, long, int)} with
+     * a default {@code batchSize}.
+     */
+    public static Iterable<NodeDocument> getSelectedDocuments(
             DocumentStore store, String indexedProperty, long startValue) {
-        return internalGetSelectedDocuments(store, indexedProperty, startValue);
+        return internalGetSelectedDocuments(store, indexedProperty, startValue, DEFAULT_BATCH_SIZE);
     }
 
     private static Iterable<NodeDocument> internalGetSelectedDocuments(
             final DocumentStore store, final String indexedProperty,
-            final long startValue) {
+            final long startValue, final int batchSize) {
+        if (batchSize < 2) {
+            throw new IllegalArgumentException("batchSize must be > 1");
+        }
         return new Iterable<NodeDocument>() {
             @Override
             public Iterator<NodeDocument> iterator() {
                 return new AbstractIterator<NodeDocument>() {
 
-                    private static final int BATCH_SIZE = 100;
                     private String startId = NodeDocument.MIN_ID_VALUE;
 
                     private Iterator<NodeDocument> batch = nextBatch();
@@ -657,8 +672,8 @@ public class Utils {
 
                     private Iterator<NodeDocument> nextBatch() {
                         List<NodeDocument> result = indexedProperty == null ? store.query(Collection.NODES, startId,
-                                NodeDocument.MAX_ID_VALUE, BATCH_SIZE) : store.query(Collection.NODES, startId,
-                                NodeDocument.MAX_ID_VALUE, indexedProperty, startValue, BATCH_SIZE);
+                                NodeDocument.MAX_ID_VALUE, batchSize) : store.query(Collection.NODES, startId,
+                                NodeDocument.MAX_ID_VALUE, indexedProperty, startValue, batchSize);
                         return result.iterator();
                     }
                 };