You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by dw...@apache.org on 2021/03/10 09:59:00 UTC
[lucene] 05/05: SOLR-14608: Resize priority queue size for small
result sets and ensure a minimum priority queue size for very small result
sets
This is an automated email from the ASF dual-hosted git repository.
dweiss pushed a commit to branch jira/SOLR-14608-export-merge
in repository https://gitbox.apache.org/repos/asf/lucene.git
commit 9de3486336ec6cfc233aa1d8bb3e473eaf2101b3
Author: Joel Bernstein <jb...@apache.org>
AuthorDate: Tue Jan 19 13:40:06 2021 -0500
SOLR-14608: Resize priority queue size for small result sets and ensure a minimum priority queue size for very small result sets
---
.../apache/solr/handler/export/ExportBuffers.java | 2 +-
.../apache/solr/handler/export/ExportWriter.java | 46 ++++++++++++++++++----
2 files changed, 39 insertions(+), 9 deletions(-)
diff --git a/solr/core/src/java/org/apache/solr/handler/export/ExportBuffers.java b/solr/core/src/java/org/apache/solr/handler/export/ExportBuffers.java
index dd004f5..9e5478f 100644
--- a/solr/core/src/java/org/apache/solr/handler/export/ExportBuffers.java
+++ b/solr/core/src/java/org/apache/solr/handler/export/ExportBuffers.java
@@ -224,7 +224,7 @@ class ExportBuffers {
// );
// allDone.join();
log.debug("-- finished.");
- } catch (Exception e) {
+ } catch (Throwable e) {
Throwable ex = e;
boolean ignore = false;
while (ex != null) {
diff --git a/solr/core/src/java/org/apache/solr/handler/export/ExportWriter.java b/solr/core/src/java/org/apache/solr/handler/export/ExportWriter.java
index 4db66e1..953cb4e 100644
--- a/solr/core/src/java/org/apache/solr/handler/export/ExportWriter.java
+++ b/solr/core/src/java/org/apache/solr/handler/export/ExportWriter.java
@@ -110,7 +110,7 @@ public class ExportWriter implements SolrCore.RawWriter, Closeable {
//The batch size for the output writer thread.
final int batchSize;
//The max combined size of the segment level priority queues.
- final int priorityQueueSize;
+ private int priorityQueueSize;
StreamExpression streamExpression;
StreamContext streamContext;
FieldWriter[] fieldWriters;
@@ -295,13 +295,18 @@ public class ExportWriter implements SolrCore.RawWriter, Closeable {
streamContext.put(CommonParams.SORT, params.get(CommonParams.SORT));
}
- writer.writeMap(m -> {
- m.put("responseHeader", singletonMap("status", 0));
- m.put("response", (MapWriter) mw -> {
- mw.put("numFound", totalHits);
- mw.put("docs", (IteratorWriter) iw -> writeDocs(req, os, iw, sort));
+ try {
+ writer.writeMap(m -> {
+ m.put("responseHeader", singletonMap("status", 0));
+ m.put("response", (MapWriter) mw -> {
+ mw.put("numFound", totalHits);
+ mw.put("docs", (IteratorWriter) iw -> writeDocs(req, os, iw, sort));
+ });
});
- });
+ } catch (java.io.EOFException e) {
+ log.info("Caught Eof likely caused by early client disconnect");
+ }
+
if (streamContext != null) {
streamContext = null;
}
@@ -421,7 +426,7 @@ public class ExportWriter implements SolrCore.RawWriter, Closeable {
long startExchangeBuffers = System.nanoTime();
buffers.exchangeBuffers();
long endExchangeBuffers = System.nanoTime();
- if(log.isDebugEnabled()) {
+ if (log.isDebugEnabled()) {
log.debug("Waited for reader thread {}:", Long.toString(((endExchangeBuffers - startExchangeBuffers) / 1000000)));
}
} finally {
@@ -662,11 +667,36 @@ public class ExportWriter implements SolrCore.RawWriter, Closeable {
totalDocs += leaves.get(i).reader().maxDoc();
}
+ //Resize the priorityQueueSize down for small result sets.
+ this.priorityQueueSize = Math.min(this.priorityQueueSize, (int)(this.totalHits*1.5));
+
+ if(log.isDebugEnabled()) {
+ log.debug("Total priority queue size {}:", this.priorityQueueSize);
+ }
+
int[] sizes = new int[leaves.size()];
+
+ int combineQueueSize = 0;
for (int i = 0; i < leaves.size(); i++) {
long maxDoc = leaves.get(i).reader().maxDoc();
int sortQueueSize = Math.min((int) (((double) maxDoc / (double) totalDocs) * this.priorityQueueSize), batchSize);
+
+ //Protect against too small a queue size as well
+ if(sortQueueSize < 10) {
+ sortQueueSize = 10;
+ }
+
+ if(log.isDebugEnabled()) {
+ log.debug("Segment priority queue size {}:", sortQueueSize);
+ }
+
sizes[i] = sortQueueSize;
+ combineQueueSize += sortQueueSize;
+
+ }
+
+ if(log.isDebugEnabled()) {
+ log.debug("Combined priority queue size {}:", combineQueueSize);
}
SegmentIterator[] segmentIterators = new SegmentIterator[leaves.size()];