You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ge...@apache.org on 2019/02/21 21:05:54 UTC

[lucene-solr] branch master updated: Minor cleanup to ExportWriter

This is an automated email from the ASF dual-hosted git repository.

gerlowskija pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git


The following commit(s) were added to refs/heads/master by this push:
     new 5ab5ba7  Minor cleanup to ExportWriter
5ab5ba7 is described below

commit 5ab5ba773a9cba5624137145c24e90596a1a0e20
Author: Jason Gerlowski <ge...@apache.org>
AuthorDate: Thu Feb 21 15:52:32 2019 -0500

    Minor cleanup to ExportWriter
---
 .../apache/solr/handler/export/ExportWriter.java   | 129 ++++++++++++---------
 1 file changed, 74 insertions(+), 55 deletions(-)

diff --git a/solr/core/src/java/org/apache/solr/handler/export/ExportWriter.java b/solr/core/src/java/org/apache/solr/handler/export/ExportWriter.java
index 5dff529..2c1ab96 100644
--- a/solr/core/src/java/org/apache/solr/handler/export/ExportWriter.java
+++ b/solr/core/src/java/org/apache/solr/handler/export/ExportWriter.java
@@ -69,7 +69,21 @@ import static java.util.Collections.singletonList;
 import static java.util.Collections.singletonMap;
 import static org.apache.solr.common.util.Utils.makeMap;
 
+/**
+ * Prepares and writes the documents requested by /export requests
+ *
+ * {@link ExportWriter} gathers and sorts the documents for a core using "stream sorting".
+ * <p>
+ * Stream sorting works by repeatedly processing and modifying a bitmap of matching documents.  Each pass over the
+ * bitmap identifies the smallest {@link #DOCUMENT_BATCH_SIZE} docs that haven't been sent yet and stores them in a
+ * Priority Queue.  They are then exported (written across the wire) and marked as sent (unset in the bitmap).
+ * This process repeats until all matching documents have been sent.
+ * <p>
+ * This streaming approach is light on memory (only {@link #DOCUMENT_BATCH_SIZE} documents are ever stored in memory at
+ * once), and it allows {@link ExportWriter} to scale well with regard to numDocs.
+ */
 public class ExportWriter implements SolrCore.RawWriter, Closeable {
+  private static final int DOCUMENT_BATCH_SIZE = 30000;
   private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
   private OutputStreamWriter respWriter;
   final SolrQueryRequest req;
@@ -211,72 +225,77 @@ public class ExportWriter implements SolrCore.RawWriter, Closeable {
 
   }
 
-  protected void writeDocs(SolrQueryRequest req, IteratorWriter.ItemWriter writer, Sort sort) throws IOException {
-    //Write the data.
-    List<LeafReaderContext> leaves = req.getSearcher().getTopReaderContext().leaves();
-    SortDoc sortDoc = getSortDoc(req.getSearcher(), sort.getSort());
-    int count = 0;
-    int queueSize = 30000;
-    if (totalHits < 30000) {
-      queueSize = totalHits;
+  protected void identifyLowestSortingUnexportedDocs(List<LeafReaderContext> leaves, SortDoc sortDoc, SortQueue queue) throws IOException {
+    queue.reset();
+    SortDoc top = queue.top();
+    for (int i = 0; i < leaves.size(); i++) {
+      sortDoc.setNextReader(leaves.get(i));
+      DocIdSetIterator it = new BitSetIterator(sets[i], 0); // cost is not useful here
+      int docId;
+      while ((docId = it.nextDoc()) != DocIdSetIterator.NO_MORE_DOCS) {
+        sortDoc.setValues(docId);
+        if (top.lessThan(sortDoc)) {
+          top.setValues(sortDoc);
+          top = queue.updateTop();
+        }
+      }
     }
-    SortQueue queue = new SortQueue(queueSize, sortDoc);
-    SortDoc[] outDocs = new SortDoc[queueSize];
+  }
 
-    while (count < totalHits) {
-      //long begin = System.nanoTime();
-      queue.reset();
-      SortDoc top = queue.top();
-      for (int i = 0; i < leaves.size(); i++) {
-        sortDoc.setNextReader(leaves.get(i));
-        DocIdSetIterator it = new BitSetIterator(sets[i], 0); // cost is not useful here
-        int docId;
-        while ((docId = it.nextDoc()) != DocIdSetIterator.NO_MORE_DOCS) {
-          sortDoc.setValues(docId);
-          if (top.lessThan(sortDoc)) {
-            top.setValues(sortDoc);
-            top = queue.updateTop();
-          }
-        }
+  protected int transferBatchToArrayForOutput(SortQueue queue, SortDoc[] destinationArr) {
+    int outDocsIndex = -1;
+    for (int i = 0; i < queue.maxSize; i++) {
+      SortDoc s = queue.pop();
+      if (s.docId > -1) {
+        destinationArr[++outDocsIndex] = s;
       }
+    }
 
-      int outDocsIndex = -1;
+    return outDocsIndex;
+  }
 
-      for (int i = 0; i < queueSize; i++) {
-        SortDoc s = queue.pop();
-        if (s.docId > -1) {
-          outDocs[++outDocsIndex] = s;
+  protected void addDocsToItemWriter(List<LeafReaderContext> leaves, IteratorWriter.ItemWriter writer, SortDoc[] docsToExport, int outDocsIndex) throws IOException {
+    try {
+      for (int i = outDocsIndex; i >= 0; --i) {
+        SortDoc s = docsToExport[i];
+        writer.add((MapWriter) ew -> {
+          writeDoc(s, leaves, ew);
+          s.reset();
+        });
+      }
+    } catch (Throwable e) {
+      Throwable ex = e;
+      while (ex != null) {
+        String m = ex.getMessage();
+        if (m != null && m.contains("Broken pipe")) {
+          throw new IgnoreException();
         }
+        ex = ex.getCause();
+      }
+
+      if (e instanceof IOException) {
+        throw ((IOException) e);
+      } else {
+        throw new IOException(e);
       }
+    }
+  }
 
-      //long end = System.nanoTime();
+  protected void writeDocs(SolrQueryRequest req, IteratorWriter.ItemWriter writer, Sort sort) throws IOException {
+    List<LeafReaderContext> leaves = req.getSearcher().getTopReaderContext().leaves();
+    SortDoc sortDoc = getSortDoc(req.getSearcher(), sort.getSort());
+    int count = 0;
+    final int queueSize = Math.min(DOCUMENT_BATCH_SIZE, totalHits);
 
-      count += (outDocsIndex + 1);
+    SortQueue queue = new SortQueue(queueSize, sortDoc);
+    SortDoc[] outDocs = new SortDoc[queueSize];
 
-      try {
-        for (int i = outDocsIndex; i >= 0; --i) {
-          SortDoc s = outDocs[i];
-          writer.add((MapWriter) ew -> {
-            writeDoc(s, leaves, ew);
-            s.reset();
-          });
-        }
-      } catch (Throwable e) {
-        Throwable ex = e;
-        while (ex != null) {
-          String m = ex.getMessage();
-          if (m != null && m.contains("Broken pipe")) {
-            throw new IgnoreException();
-          }
-          ex = ex.getCause();
-        }
+    while (count < totalHits) {
+      identifyLowestSortingUnexportedDocs(leaves, sortDoc, queue);
+      int outDocsIndex = transferBatchToArrayForOutput(queue, outDocs);
 
-        if (e instanceof IOException) {
-          throw ((IOException) e);
-        } else {
-          throw new IOException(e);
-        }
-      }
+      count += (outDocsIndex + 1);
+      addDocsToItemWriter(leaves, writer, outDocs, outDocsIndex);
     }
   }