You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by dw...@apache.org on 2021/03/10 09:59:00 UTC

[lucene] 05/05: SOLR-14608: Resize priority queue size for small result sets and ensure a minimum priority queue size for very small result sets

This is an automated email from the ASF dual-hosted git repository.

dweiss pushed a commit to branch jira/SOLR-14608-export-merge
in repository https://gitbox.apache.org/repos/asf/lucene.git

commit 9de3486336ec6cfc233aa1d8bb3e473eaf2101b3
Author: Joel Bernstein <jb...@apache.org>
AuthorDate: Tue Jan 19 13:40:06 2021 -0500

    SOLR-14608: Resize priority queue size for small result sets and ensure a minimum priority queue size for very small result sets
---
 .../apache/solr/handler/export/ExportBuffers.java  |  2 +-
 .../apache/solr/handler/export/ExportWriter.java   | 46 ++++++++++++++++++----
 2 files changed, 39 insertions(+), 9 deletions(-)

diff --git a/solr/core/src/java/org/apache/solr/handler/export/ExportBuffers.java b/solr/core/src/java/org/apache/solr/handler/export/ExportBuffers.java
index dd004f5..9e5478f 100644
--- a/solr/core/src/java/org/apache/solr/handler/export/ExportBuffers.java
+++ b/solr/core/src/java/org/apache/solr/handler/export/ExportBuffers.java
@@ -224,7 +224,7 @@ class ExportBuffers {
 //        );
 //        allDone.join();
       log.debug("-- finished.");
-    } catch (Exception e) {
+    } catch (Throwable e) {
       Throwable ex = e;
       boolean ignore = false;
       while (ex != null) {
diff --git a/solr/core/src/java/org/apache/solr/handler/export/ExportWriter.java b/solr/core/src/java/org/apache/solr/handler/export/ExportWriter.java
index 4db66e1..953cb4e 100644
--- a/solr/core/src/java/org/apache/solr/handler/export/ExportWriter.java
+++ b/solr/core/src/java/org/apache/solr/handler/export/ExportWriter.java
@@ -110,7 +110,7 @@ public class ExportWriter implements SolrCore.RawWriter, Closeable {
   //The batch size for the output writer thread.
   final int batchSize;
   //The max combined size of the segment level priority queues.
-  final int priorityQueueSize;
+  private int priorityQueueSize;
   StreamExpression streamExpression;
   StreamContext streamContext;
   FieldWriter[] fieldWriters;
@@ -295,13 +295,18 @@ public class ExportWriter implements SolrCore.RawWriter, Closeable {
       streamContext.put(CommonParams.SORT, params.get(CommonParams.SORT));
     }
 
-    writer.writeMap(m -> {
-      m.put("responseHeader", singletonMap("status", 0));
-      m.put("response", (MapWriter) mw -> {
-        mw.put("numFound", totalHits);
-        mw.put("docs", (IteratorWriter) iw -> writeDocs(req, os, iw, sort));
+    try {
+      writer.writeMap(m -> {
+        m.put("responseHeader", singletonMap("status", 0));
+        m.put("response", (MapWriter) mw -> {
+          mw.put("numFound", totalHits);
+          mw.put("docs", (IteratorWriter) iw -> writeDocs(req, os, iw, sort));
+        });
       });
-    });
+    } catch (java.io.EOFException e) {
+      log.info("Caught Eof likely caused by early client disconnect");
+    }
+
     if (streamContext != null) {
       streamContext = null;
     }
@@ -421,7 +426,7 @@ public class ExportWriter implements SolrCore.RawWriter, Closeable {
             long startExchangeBuffers = System.nanoTime();
             buffers.exchangeBuffers();
             long endExchangeBuffers = System.nanoTime();
-            if(log.isDebugEnabled()) {
+            if (log.isDebugEnabled()) {
               log.debug("Waited for reader thread {}:", Long.toString(((endExchangeBuffers - startExchangeBuffers) / 1000000)));
             }
           } finally {
@@ -662,11 +667,36 @@ public class ExportWriter implements SolrCore.RawWriter, Closeable {
         totalDocs += leaves.get(i).reader().maxDoc();
       }
 
+      //Resize the priorityQueueSize down for small result sets.
+      this.priorityQueueSize = Math.min(this.priorityQueueSize, (int)(this.totalHits*1.5));
+
+      if(log.isDebugEnabled()) {
+        log.debug("Total priority queue size {}:", this.priorityQueueSize);
+      }
+
       int[] sizes = new int[leaves.size()];
+
+      int combineQueueSize = 0;
       for (int i = 0; i < leaves.size(); i++) {
         long maxDoc = leaves.get(i).reader().maxDoc();
         int sortQueueSize = Math.min((int) (((double) maxDoc / (double) totalDocs) * this.priorityQueueSize), batchSize);
+
+        //Protect against too small a queue size as well
+        if(sortQueueSize < 10) {
+          sortQueueSize = 10;
+        }
+
+        if(log.isDebugEnabled()) {
+          log.debug("Segment priority queue size {}:", sortQueueSize);
+        }
+
         sizes[i] = sortQueueSize;
+        combineQueueSize += sortQueueSize;
+
+      }
+
+      if(log.isDebugEnabled()) {
+        log.debug("Combined priority queue size {}:", combineQueueSize);
       }
 
       SegmentIterator[] segmentIterators = new SegmentIterator[leaves.size()];