You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ab...@apache.org on 2020/07/03 11:50:53 UTC

[lucene-solr] branch branch_8x updated: SOLR-14537: Improve performance of ExportWriter.

This is an automated email from the ASF dual-hosted git repository.

ab pushed a commit to branch branch_8x
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git


The following commit(s) were added to refs/heads/branch_8x by this push:
     new dc2de74  SOLR-14537: Improve performance of ExportWriter.
dc2de74 is described below

commit dc2de7418cbb2f700103d8149e6d1416e24ff109
Author: Andrzej Bialecki <ab...@apache.org>
AuthorDate: Fri Jul 3 12:57:35 2020 +0200

    SOLR-14537: Improve performance of ExportWriter.
---
 solr/CHANGES.txt                                   |   2 +
 .../org/apache/solr/handler/ExportHandler.java     |  16 +-
 .../solr/handler/export/DoubleValueSortDoc.java    |   1 +
 .../apache/solr/handler/export/ExportBuffers.java  | 268 +++++++++++++++
 .../apache/solr/handler/export/ExportWriter.java   | 374 ++++++++-------------
 .../solr/handler/export/ExportWriterStream.java    | 242 +++++++++++++
 .../org/apache/solr/handler/export/LongValue.java  |   4 +-
 .../apache/solr/handler/export/PriorityQueue.java  | 218 ------------
 .../solr/handler/export/QuadValueSortDoc.java      |   1 +
 .../solr/handler/export/SingleValueSortDoc.java    |   8 +-
 .../org/apache/solr/handler/export/SortDoc.java    |  23 +-
 .../org/apache/solr/handler/export/SortQueue.java  | 155 ++++++++-
 .../solr/handler/export/StringFieldWriter.java     |  13 +-
 .../apache/solr/handler/export/StringValue.java    |  14 +-
 .../solr/handler/export/TripleValueSortDoc.java    |   1 +
 .../solr/handler/export/TestExportWriter.java      |  31 +-
 solr/solr-ref-guide/src/exporting-result-sets.adoc |   2 +
 .../org/apache/solr/client/solrj/io/Tuple.java     |  17 +
 18 files changed, 899 insertions(+), 491 deletions(-)

diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
index 0c3f9a5..e6c3daf 100644
--- a/solr/CHANGES.txt
+++ b/solr/CHANGES.txt
@@ -17,6 +17,8 @@ Improvements
 
 * SOLR-14523: Enhance gradle logging calls validation: eliminate getMessage() (Andras Salamon via Erick Erickson)
 
+* SOLR-14537: Improve performance of ExportWriter. (ab, Joel Bernstein)
+
 
 Optimizations
 ---------------------
diff --git a/solr/core/src/java/org/apache/solr/handler/ExportHandler.java b/solr/core/src/java/org/apache/solr/handler/ExportHandler.java
index 64999dc..69a71b4 100644
--- a/solr/core/src/java/org/apache/solr/handler/ExportHandler.java
+++ b/solr/core/src/java/org/apache/solr/handler/ExportHandler.java
@@ -34,6 +34,9 @@ import org.apache.solr.core.CoreContainer;
 import org.apache.solr.core.SolrCore;
 import org.apache.solr.handler.component.SearchHandler;
 import org.apache.solr.handler.export.ExportWriter;
+import org.apache.solr.handler.export.ExportWriterStream;
+import org.apache.solr.metrics.SolrMetricManager;
+import org.apache.solr.metrics.SolrMetricsContext;
 import org.apache.solr.request.SolrQueryRequest;
 import org.apache.solr.response.SolrQueryResponse;
 import org.slf4j.Logger;
@@ -51,6 +54,7 @@ public class ExportHandler extends SearchHandler {
   private String coreName;
   private SolrClientCache solrClientCache;
   private StreamContext initialStreamContext;
+  private String writerMetricsPath;
 
   public static class ExportHandlerStreamFactory extends SolrDefaultStreamFactory {
     static final String[] forbiddenStreams = new String[] {
@@ -67,11 +71,17 @@ public class ExportHandler extends SearchHandler {
       for (String function : forbiddenStreams) {
         this.withoutFunctionName(function);
       }
-      this.withFunctionName("input", ExportWriter.ExportWriterStream.class);
+      this.withFunctionName("input", ExportWriterStream.class);
     }
   }
 
   @Override
+  public void initializeMetrics(SolrMetricsContext parentContext, String scope) {
+    super.initializeMetrics(parentContext, scope);
+    this.writerMetricsPath = SolrMetricManager.mkName("writer", getCategory().toString(), scope);
+  }
+
+  @Override
   public void inform(SolrCore core) {
     super.inform(core);
     String defaultCollection;
@@ -98,6 +108,7 @@ public class ExportHandler extends SearchHandler {
     initialStreamContext.setObjectCache(objectCache);
     initialStreamContext.put("core", this.coreName);
     initialStreamContext.put("solr-core", core);
+    initialStreamContext.put("exportHandler", this);
   }
 
   @Override
@@ -112,6 +123,7 @@ public class ExportHandler extends SearchHandler {
     Map<String, String> map = new HashMap<>(1);
     map.put(CommonParams.WT, ReplicationHandler.FILE_STREAM);
     req.setParams(SolrParams.wrapDefaults(new MapSolrParams(map),req.getParams()));
-    rsp.add(ReplicationHandler.FILE_STREAM, new ExportWriter(req, rsp, wt, initialStreamContext));
+    rsp.add(ReplicationHandler.FILE_STREAM, new ExportWriter(req, rsp, wt, initialStreamContext, solrMetricsContext,
+        writerMetricsPath, this));
   }
 }
diff --git a/solr/core/src/java/org/apache/solr/handler/export/DoubleValueSortDoc.java b/solr/core/src/java/org/apache/solr/handler/export/DoubleValueSortDoc.java
index 25899f4..8c2a92a 100644
--- a/solr/core/src/java/org/apache/solr/handler/export/DoubleValueSortDoc.java
+++ b/solr/core/src/java/org/apache/solr/handler/export/DoubleValueSortDoc.java
@@ -44,6 +44,7 @@ class DoubleValueSortDoc extends SingleValueSortDoc {
   public void reset() {
     this.docId = -1;
     this.docBase = -1;
+    this.ord = -1;
     value1.reset();
     value2.reset();
   }
diff --git a/solr/core/src/java/org/apache/solr/handler/export/ExportBuffers.java b/solr/core/src/java/org/apache/solr/handler/export/ExportBuffers.java
new file mode 100644
index 0000000..393067a
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/handler/export/ExportBuffers.java
@@ -0,0 +1,268 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.handler.export;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.lang.invoke.MethodHandles;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.LongAdder;
+
+import com.codahale.metrics.Timer;
+import org.apache.lucene.index.LeafReaderContext;
+import org.apache.lucene.search.Sort;
+import org.apache.solr.common.IteratorWriter;
+import org.apache.solr.common.util.ExecutorUtil;
+import org.apache.solr.common.util.SolrNamedThreadFactory;
+import org.apache.solr.search.SolrIndexSearcher;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Helper class implementing a "double buffering" producer / consumer.
+ */
+class ExportBuffers {
+  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+  static final long EXCHANGE_TIMEOUT_SECONDS = 600;
+  static final String EXPORT_BUFFERS_KEY = "__eb__";
+
+  final Buffer bufferOne;
+  final Buffer bufferTwo;
+  final List<LeafReaderContext> leaves;
+  final ExportWriter exportWriter;
+  final OutputStream os;
+  final Timer writeOutputBufferTimer;
+  final Timer fillerWaitTimer;
+  final Timer writerWaitTimer;
+  final IteratorWriter.ItemWriter rawWriter;
+  final IteratorWriter.ItemWriter writer;
+  final CyclicBarrier barrier;
+  final int totalHits;
+  Buffer fillBuffer;
+  Buffer outputBuffer;
+  Runnable filler;
+  ExecutorService service;
+  Throwable error;
+  LongAdder outputCounter = new LongAdder();
+  volatile boolean shutDown = false;
+
+  ExportBuffers(ExportWriter exportWriter, List<LeafReaderContext> leaves, SolrIndexSearcher searcher,
+                OutputStream os, IteratorWriter.ItemWriter rawWriter, Sort sort, int queueSize, int totalHits,
+                Timer writeOutputBufferTimer, Timer fillerWaitTimer, Timer writerWaitTimer) throws IOException {
+    this.exportWriter = exportWriter;
+    this.leaves = leaves;
+    this.os = os;
+    this.rawWriter = rawWriter;
+    this.writer = new IteratorWriter.ItemWriter() {
+      @Override
+      public IteratorWriter.ItemWriter add(Object o) throws IOException {
+        rawWriter.add(o);
+        outputCounter.increment();
+        return this;
+      }
+    };
+    this.writeOutputBufferTimer = writeOutputBufferTimer;
+    this.fillerWaitTimer = fillerWaitTimer;
+    this.writerWaitTimer = writerWaitTimer;
+    this.bufferOne = new Buffer(queueSize);
+    this.bufferTwo = new Buffer(queueSize);
+    this.totalHits = totalHits;
+    fillBuffer = bufferOne;
+    outputBuffer = bufferTwo;
+    SortDoc writerSortDoc = exportWriter.getSortDoc(searcher, sort.getSort());
+    bufferOne.initialize(writerSortDoc);
+    bufferTwo.initialize(writerSortDoc);
+    barrier = new CyclicBarrier(2, () -> swapBuffers());
+    filler = () -> {
+      try {
+        log.debug("--- filler start {}", Thread.currentThread());
+        SortDoc sortDoc = exportWriter.getSortDoc(searcher, sort.getSort());
+        Buffer buffer = getFillBuffer();
+        SortQueue queue = new SortQueue(queueSize, sortDoc);
+        long lastOutputCounter = 0;
+        for (int count = 0; count < totalHits; ) {
+          log.debug("--- filler fillOutDocs in {}", fillBuffer);
+          exportWriter.fillOutDocs(leaves, sortDoc, queue, buffer);
+          count += (buffer.outDocsIndex + 1);
+          log.debug("--- filler count={}, exchange buffer from {}", count, buffer);
+          Timer.Context timerContext = getFillerWaitTimer().time();
+          try {
+            exchangeBuffers();
+          } finally {
+            timerContext.stop();
+          }
+          buffer = getFillBuffer();
+          if (outputCounter.longValue() > lastOutputCounter) {
+            lastOutputCounter = outputCounter.longValue();
+            flushOutput();
+          }
+          log.debug("--- filler got empty buffer {}", buffer);
+        }
+        buffer.outDocsIndex = Buffer.NO_MORE_DOCS;
+        log.debug("--- filler final exchange buffer from {}", buffer);
+        Timer.Context timerContext = getFillerWaitTimer().time();
+        try {
+          exchangeBuffers();
+        } finally {
+          timerContext.stop();
+        }
+        buffer = getFillBuffer();
+        log.debug("--- filler final got buffer {}", buffer);
+      } catch (Throwable e) {
+        log.error("filler", e);
+        error(e);
+        if (e instanceof InterruptedException) {
+          Thread.currentThread().interrupt();
+        }
+        shutdownNow();
+      }
+    };
+  }
+
+  public void exchangeBuffers() throws Exception {
+    log.debug("---- wait exchangeBuffers from {}", Thread.currentThread());
+    barrier.await(EXCHANGE_TIMEOUT_SECONDS, TimeUnit.SECONDS);
+  }
+
+  public void error(Throwable t) {
+    error = t;
+    // break the lock on the other thread too
+    barrier.reset();
+  }
+
+  public Throwable getError() {
+    return error;
+  }
+
+  private void swapBuffers() {
+    log.debug("--- swap buffers");
+    Buffer one = fillBuffer;
+    fillBuffer = outputBuffer;
+    outputBuffer = one;
+  }
+
+  private void flushOutput() throws IOException {
+    //os.flush();
+  }
+
+  // initial output buffer
+  public Buffer getOutputBuffer() {
+    return outputBuffer;
+  }
+
+  public Buffer getFillBuffer() {
+    return fillBuffer;
+  }
+
+  public Timer getWriteOutputBufferTimer() {
+    return writeOutputBufferTimer;
+  }
+
+  public Timer getFillerWaitTimer() {
+    return fillerWaitTimer;
+  }
+
+  public Timer getWriterWaitTimer() {
+    return writerWaitTimer;
+  }
+
+  // decorated writer that keeps track of number of writes
+  public IteratorWriter.ItemWriter getWriter() {
+    return writer;
+  }
+
+  public void shutdownNow() {
+    if (service != null) {
+      log.debug("--- shutting down buffers");
+      service.shutdownNow();
+      service = null;
+    }
+    shutDown = true;
+  }
+
+  public boolean isShutDown() {
+    return shutDown;
+  }
+
+  /**
+   * Start processing and block until complete or Exception is thrown.
+   *
+   * @param writer writer that exchanges and processes buffers received from a producer.
+   * @throws IOException on errors
+   */
+  public void run(Callable<Boolean> writer) throws IOException {
+    service = ExecutorUtil.newMDCAwareFixedThreadPool(1, new SolrNamedThreadFactory("ExportBuffers"));
+    try {
+      CompletableFuture.runAsync(filler, service);
+      writer.call();
+
+      // alternatively we could run the writer in a separate thread:
+//        CompletableFuture<Void> allDone = CompletableFuture.allOf(
+//            CompletableFuture.runAsync(filler, service),
+//            CompletableFuture.runAsync(() -> {
+//              try {
+//                writer.call();
+//              } catch (Exception e) {
+//                log.error("writer", e);
+//                shutdownNow();
+//              }
+//            }, service)
+//        );
+//        allDone.join();
+      log.debug("-- finished.");
+    } catch (Exception e) {
+      log.error("Exception running filler / writer", e);
+      error(e);
+      //
+    } finally {
+      log.debug("--- all done, shutting down buffers");
+      shutdownNow();
+    }
+  }
+
+  public static final class Buffer {
+    static final int EMPTY = -1;
+    static final int NO_MORE_DOCS = -2;
+
+    int outDocsIndex = EMPTY;
+    SortDoc[] outDocs;
+
+    public Buffer(int size) {
+      outDocs = new SortDoc[size];
+    }
+
+    public void initialize(SortDoc proto) {
+      outDocsIndex = EMPTY;
+      for (int i = 0; i < outDocs.length; i++) {
+        outDocs[i] = proto.copy();
+      }
+    }
+
+    @Override
+    public String toString() {
+      return "Buffer@" + Integer.toHexString(hashCode()) + "{" +
+          "outDocsIndex=" + outDocsIndex +
+          '}';
+    }
+  }
+}
diff --git a/solr/core/src/java/org/apache/solr/handler/export/ExportWriter.java b/solr/core/src/java/org/apache/solr/handler/export/ExportWriter.java
index a0cde08..d2d6129 100644
--- a/solr/core/src/java/org/apache/solr/handler/export/ExportWriter.java
+++ b/solr/core/src/java/org/apache/solr/handler/export/ExportWriter.java
@@ -26,6 +26,7 @@ import java.lang.invoke.MethodHandles;
 import java.nio.charset.StandardCharsets;
 import java.util.List;
 
+import com.codahale.metrics.Timer;
 import org.apache.lucene.index.LeafReader;
 import org.apache.lucene.index.LeafReaderContext;
 import org.apache.lucene.index.SortedDocValues;
@@ -36,18 +37,10 @@ import org.apache.lucene.util.BitSetIterator;
 import org.apache.lucene.util.FixedBitSet;
 import org.apache.solr.client.solrj.impl.BinaryResponseParser;
 import org.apache.solr.client.solrj.io.Tuple;
-import org.apache.solr.client.solrj.io.comp.ComparatorOrder;
-import org.apache.solr.client.solrj.io.comp.FieldComparator;
-import org.apache.solr.client.solrj.io.comp.MultipleFieldComparator;
-import org.apache.solr.client.solrj.io.comp.StreamComparator;
 import org.apache.solr.client.solrj.io.stream.StreamContext;
 import org.apache.solr.client.solrj.io.stream.TupleStream;
-import org.apache.solr.client.solrj.io.stream.expr.Explanation;
-import org.apache.solr.client.solrj.io.stream.expr.Expressible;
-import org.apache.solr.client.solrj.io.stream.expr.StreamExplanation;
 import org.apache.solr.client.solrj.io.stream.expr.StreamExpression;
 import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionNamedParameter;
-import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionParameter;
 import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionParser;
 import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
 import org.apache.solr.common.IteratorWriter;
@@ -60,6 +53,8 @@ import org.apache.solr.common.params.SolrParams;
 import org.apache.solr.common.params.StreamParams;
 import org.apache.solr.common.util.JavaBinCodec;
 import org.apache.solr.core.SolrCore;
+import org.apache.solr.handler.ExportHandler;
+import org.apache.solr.metrics.SolrMetricsContext;
 import org.apache.solr.request.SolrQueryRequest;
 import org.apache.solr.request.SolrRequestInfo;
 import org.apache.solr.response.BinaryResponseWriter;
@@ -93,29 +88,26 @@ import static org.apache.solr.common.util.Utils.makeMap;
  * {@link ExportWriter} gathers and sorts the documents for a core using "stream sorting".
  * <p>
  * Stream sorting works by repeatedly processing and modifying a bitmap of matching documents.  Each pass over the
- * bitmap identifies the smallest {@link #DOCUMENT_BATCH_SIZE} docs that haven't been sent yet and stores them in a
+ * bitmap identifies the smallest docs (default is {@link #DEFAULT_BATCH_SIZE}) that haven't been sent yet and stores them in a
  * Priority Queue.  They are then exported (written across the wire) and marked as sent (unset in the bitmap).
  * This process repeats until all matching documents have been sent.
  * <p>
- * This streaming approach is light on memory (only {@link #DOCUMENT_BATCH_SIZE} documents are ever stored in memory at
+ * This streaming approach is light on memory (only up to 2x batch size documents are ever stored in memory at
  * once), and it allows {@link ExportWriter} to scale well with regard to numDocs.
  */
 public class ExportWriter implements SolrCore.RawWriter, Closeable {
   private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
-  private static final int DOCUMENT_BATCH_SIZE = 30000;
-
-  private static final String EXPORT_WRITER_KEY = "__ew__";
-  private static final String SORT_DOCS_KEY = "_ew_docs_";
-  private static final String TOTAL_HITS_KEY = "_ew_totalHits_";
-  private static final String LEAF_READERS_KEY = "_ew_leaves_";
-  private static final String SORT_QUEUE_KEY = "_ew_queue_";
-  private static final String SORT_DOC_KEY = "_ew_sort_";
+  public static final String BATCH_SIZE_PARAM = "batchSize";
+  public static final int DEFAULT_BATCH_SIZE = 30000;
 
   private OutputStreamWriter respWriter;
   final SolrQueryRequest req;
   final SolrQueryResponse res;
   final StreamContext initialStreamContext;
+  final SolrMetricsContext solrMetricsContext;
+  final String metricsPath;
+  final int batchSize;
   StreamExpression streamExpression;
   StreamContext streamContext;
   FieldWriter[] fieldWriters;
@@ -123,143 +115,28 @@ public class ExportWriter implements SolrCore.RawWriter, Closeable {
   FixedBitSet[] sets = null;
   PushWriter writer;
   private String wt;
+  final Timer identifyLowestSortingDocTimer;
+  final Timer transferBatchToBufferTimer;
+  final Timer writeOutputBufferTimer;
+  final Timer writerWaitTimer;
+  final Timer fillerWaitTimer;
 
-  private static class TupleEntryWriter implements EntryWriter {
-    Tuple tuple;
-
-    void setTuple(Tuple tuple) {
-      this.tuple = tuple;
-    }
-
-    @Override
-    public EntryWriter put(CharSequence k, Object v) throws IOException {
-      tuple.put(k, v);
-      return this;
-    }
-  }
-
-  public static class ExportWriterStream extends TupleStream implements Expressible {
-    StreamContext context;
-    StreamComparator streamComparator;
-    int pos = -1;
-    int outDocIndex = -1;
-    int count;
-    SortDoc sortDoc;
-    SortQueue queue;
-    SortDoc[] docs;
-    int totalHits;
-    ExportWriter exportWriter;
-    List<LeafReaderContext> leaves;
-    final TupleEntryWriter entryWriter = new TupleEntryWriter();
-
-    public ExportWriterStream(StreamExpression expression, StreamFactory factory) throws IOException {
-      streamComparator = parseComp(factory.getDefaultSort());
-    }
-
-    @Override
-    public void setStreamContext(StreamContext context) {
-      this.context = context;
-    }
-
-    @Override
-    public List<TupleStream> children() {
-      return null;
-    }
-
-    private StreamComparator parseComp(String sort) throws IOException {
-
-      String[] sorts = sort.split(",");
-      StreamComparator[] comps = new StreamComparator[sorts.length];
-      for(int i=0; i<sorts.length; i++) {
-        String s = sorts[i];
-
-        String[] spec = s.trim().split("\\s+"); //This should take into account spaces in the sort spec.
-
-        if (spec.length != 2) {
-          throw new IOException("Invalid sort spec:" + s);
-        }
-
-        String fieldName = spec[0].trim();
-        String order = spec[1].trim();
-
-        comps[i] = new FieldComparator(fieldName, order.equalsIgnoreCase("asc") ? ComparatorOrder.ASCENDING : ComparatorOrder.DESCENDING);
-      }
-
-      if(comps.length > 1) {
-        return new MultipleFieldComparator(comps);
-      } else {
-        return comps[0];
-      }
-    }
 
-    @Override
-    @SuppressWarnings({"unchecked"})
-    public void open() throws IOException {
-      docs = (SortDoc[]) context.get(SORT_DOCS_KEY);
-      queue = (SortQueue) context.get(SORT_QUEUE_KEY);
-      sortDoc = (SortDoc) context.get(SORT_DOC_KEY);
-      totalHits = (Integer) context.get(TOTAL_HITS_KEY);
-      exportWriter = (ExportWriter) context.get(EXPORT_WRITER_KEY);
-      leaves = (List<LeafReaderContext>) context.get(LEAF_READERS_KEY);
-      count = 0;
-    }
-
-    @Override
-    public void close() throws IOException {
-      exportWriter = null;
-      leaves = null;
-    }
-
-    @Override
-    public Tuple read() throws IOException {
-      if (pos < 0) {
-        if (count < totalHits) {
-          outDocIndex = exportWriter.fillOutDocs(leaves, sortDoc, queue, docs);
-          count += (outDocIndex + 1);
-          pos = outDocIndex;
-        } else {
-          return Tuple.EOF();
-        }
-      }
-      if (pos < 0) {
-        return Tuple.EOF();
-      }
-      Tuple tuple = new Tuple();
-      entryWriter.setTuple(tuple);
-      SortDoc s = docs[pos];
-      exportWriter.writeDoc(s, leaves, entryWriter);
-      s.reset();
-      pos--;
-      return tuple;
-    }
-
-    @Override
-    public StreamComparator getStreamSort() {
-      return streamComparator;
-    }
-
-    @Override
-    public StreamExpressionParameter toExpression(StreamFactory factory) throws IOException {
-      StreamExpression expression = new StreamExpression(factory.getFunctionName(this.getClass()));
-      return expression;
-    }
-
-    @Override
-    public Explanation toExplanation(StreamFactory factory) throws IOException {
-      return new StreamExplanation(getStreamNodeId().toString())
-          .withFunctionName("input")
-          .withImplementingClass(this.getClass().getName())
-          .withExpressionType(Explanation.ExpressionType.STREAM_SOURCE)
-          .withExpression("--non-expressible--");
-    }
-  }
-
-
-  public ExportWriter(SolrQueryRequest req, SolrQueryResponse res, String wt, StreamContext initialStreamContext) {
+  public ExportWriter(SolrQueryRequest req, SolrQueryResponse res, String wt,
+                      StreamContext initialStreamContext, SolrMetricsContext solrMetricsContext,
+                      String metricsPath, ExportHandler exportHandler) {
     this.req = req;
     this.res = res;
     this.wt = wt;
     this.initialStreamContext = initialStreamContext;
+    this.solrMetricsContext = solrMetricsContext;
+    this.metricsPath = metricsPath;
+    this.batchSize = req.getParams().getInt(BATCH_SIZE_PARAM, DEFAULT_BATCH_SIZE);
+    identifyLowestSortingDocTimer = solrMetricsContext.timer(exportHandler, "identifyLowestSortingDoc", metricsPath);
+    transferBatchToBufferTimer = solrMetricsContext.timer(exportHandler, "transferBatchToBuffer", metricsPath);
+    writeOutputBufferTimer = solrMetricsContext.timer(exportHandler, "writeOutputBuffer", metricsPath);
+    writerWaitTimer = solrMetricsContext.timer(exportHandler, "writerWaitTimer", metricsPath);
+    fillerWaitTimer = solrMetricsContext.timer(exportHandler, "fillerWaitTimer", metricsPath);
   }
 
   @Override
@@ -408,7 +285,7 @@ public class ExportWriter implements SolrCore.RawWriter, Closeable {
       m.put("responseHeader", singletonMap("status", 0));
       m.put("response", (MapWriter) mw -> {
         mw.put("numFound", totalHits);
-        mw.put("docs", (IteratorWriter) iw -> writeDocs(req, iw, sort));
+        mw.put("docs", (IteratorWriter) iw -> writeDocs(req, os, iw, sort));
       });
     });
     if (streamContext != null) {
@@ -426,121 +303,158 @@ public class ExportWriter implements SolrCore.RawWriter, Closeable {
   }
 
   protected void identifyLowestSortingUnexportedDocs(List<LeafReaderContext> leaves, SortDoc sortDoc, SortQueue queue) throws IOException {
-    queue.reset();
-    SortDoc top = queue.top();
-    for (int i = 0; i < leaves.size(); i++) {
-      sortDoc.setNextReader(leaves.get(i));
-      DocIdSetIterator it = new BitSetIterator(sets[i], 0); // cost is not useful here
-      int docId;
-      while ((docId = it.nextDoc()) != DocIdSetIterator.NO_MORE_DOCS) {
-        sortDoc.setValues(docId);
-        if (top.lessThan(sortDoc)) {
-          top.setValues(sortDoc);
-          top = queue.updateTop();
+    Timer.Context timerContext = identifyLowestSortingDocTimer.time();
+    try {
+      queue.reset();
+      SortDoc top = queue.top();
+      for (int i = 0; i < leaves.size(); i++) {
+        sortDoc.setNextReader(leaves.get(i));
+        DocIdSetIterator it = new BitSetIterator(sets[i], 0); // cost is not useful here
+        int docId;
+        while ((docId = it.nextDoc()) != DocIdSetIterator.NO_MORE_DOCS) {
+          sortDoc.setValues(docId);
+          if (top.lessThan(sortDoc)) {
+            top.setValues(sortDoc);
+            top = queue.updateTop();
+          }
         }
       }
+    } finally {
+      timerContext.stop();
     }
   }
 
-  protected int transferBatchToArrayForOutput(SortQueue queue, SortDoc[] destinationArr) {
-    int outDocsIndex = -1;
-    for (int i = 0; i < queue.maxSize; i++) {
-      SortDoc s = queue.pop();
-      if (s.docId > -1) {
-        destinationArr[++outDocsIndex] = s;
-      }
-    }
-
-    return outDocsIndex;
-  }
-
-  protected void addDocsToItemWriter(List<LeafReaderContext> leaves, IteratorWriter.ItemWriter writer, SortDoc[] docsToExport, int outDocsIndex) throws IOException {
+  protected void transferBatchToBufferForOutput(SortQueue queue,
+                                                List<LeafReaderContext> leaves,
+                                                ExportBuffers.Buffer destination) throws IOException {
+    Timer.Context timerContext = transferBatchToBufferTimer.time();
     try {
-      for (int i = outDocsIndex; i >= 0; --i) {
-        SortDoc s = docsToExport[i];
-        writer.add((MapWriter) ew -> {
-          writeDoc(s, leaves, ew);
-          s.reset();
-        });
-      }
-    } catch (Throwable e) {
-      Throwable ex = e;
-      while (ex != null) {
-        String m = ex.getMessage();
-        if (m != null && m.contains("Broken pipe")) {
-          throw new IgnoreException();
+      int outDocsIndex = -1;
+      for (int i = 0; i < queue.maxSize; i++) {
+        SortDoc s = queue.pop();
+        if (s.docId > -1) {
+          destination.outDocs[++outDocsIndex].setValues(s);
+          // remove this doc id from the matching bitset, it's been exported
+          sets[s.ord].clear(s.docId);
+          s.reset(); // reuse
         }
-        ex = ex.getCause();
       }
-
-      if (e instanceof IOException) {
-        throw ((IOException) e);
-      } else {
-        throw new IOException(e);
+      destination.outDocsIndex = outDocsIndex;
+    } catch (Throwable t) {
+      log.error("transfer", t);
+      if (t instanceof InterruptedException) {
+        Thread.currentThread().interrupt();
       }
+      throw t;
+    } finally {
+      timerContext.stop();
     }
   }
 
-  protected void writeDocs(SolrQueryRequest req, IteratorWriter.ItemWriter writer, Sort sort) throws IOException {
+  protected void writeDocs(SolrQueryRequest req, OutputStream os, IteratorWriter.ItemWriter writer, Sort sort) throws IOException {
     List<LeafReaderContext> leaves = req.getSearcher().getTopReaderContext().leaves();
-    SortDoc sortDoc = getSortDoc(req.getSearcher(), sort.getSort());
-    final int queueSize = Math.min(DOCUMENT_BATCH_SIZE, totalHits);
+    final int queueSize = Math.min(batchSize, totalHits);
 
-    SortQueue queue = new SortQueue(queueSize, sortDoc);
-    SortDoc[] outDocs = new SortDoc[queueSize];
+    ExportBuffers buffers = new ExportBuffers(this, leaves, req.getSearcher(), os, writer, sort, queueSize, totalHits,
+        writeOutputBufferTimer, fillerWaitTimer, writerWaitTimer);
 
     if (streamExpression != null) {
-      streamContext.put(SORT_DOCS_KEY, outDocs);
-      streamContext.put(SORT_QUEUE_KEY, queue);
-      streamContext.put(SORT_DOC_KEY, sortDoc);
-      streamContext.put(TOTAL_HITS_KEY, totalHits);
-      streamContext.put(EXPORT_WRITER_KEY, this);
-      streamContext.put(LEAF_READERS_KEY, leaves);
-      TupleStream tupleStream = createTupleStream();
-      tupleStream.open();
-      for (;;) {
-        final Tuple t = tupleStream.read();
-        if (t == null) {
-          break;
-        }
-        if (t.EOF) {
-          break;
-        }
-        writer.add((MapWriter) ew -> t.writeMap(ew));
+      streamContext.put(ExportBuffers.EXPORT_BUFFERS_KEY, buffers);
+      final TupleStream tupleStream;
+      try {
+        tupleStream = createTupleStream();
+        tupleStream.open();
+      } catch (Exception e) {
+        buffers.getWriter().add((MapWriter) ew -> Tuple.EXCEPTION(e, true).writeMap(ew));
+        return;
       }
+      buffers.run(() -> {
+        for (;;) {
+          if (Thread.currentThread().isInterrupted()) {
+            break;
+          }
+          final Tuple t;
+          try {
+            t = tupleStream.read();
+          } catch (final Exception e) {
+            buffers.getWriter().add((MapWriter) ew -> Tuple.EXCEPTION(e, true).writeMap(ew));
+            break;
+          }
+          if (t == null) {
+            break;
+          }
+          if (t.EOF && !t.EXCEPTION) {
+            break;
+          }
+          // use decorated writer to monitor the number of output writes
+          // and flush the output quickly in case of very few (reduced) output items
+          buffers.getWriter().add((MapWriter) ew -> t.writeMap(ew));
+          if (t.EXCEPTION && t.EOF) {
+            break;
+          }
+        }
+        return true;
+      });
       tupleStream.close();
     } else {
-      for (int count = 0; count < totalHits; ) {
-        int outDocsIndex = fillOutDocs(leaves, sortDoc, queue, outDocs);
-        count += (outDocsIndex + 1);
-        addDocsToItemWriter(leaves, writer, outDocs, outDocsIndex);
-      }
+      buffers.run(() -> {
+        // get the initial buffer
+        log.debug("--- writer init exchanging from empty");
+        buffers.exchangeBuffers();
+        ExportBuffers.Buffer buffer = buffers.getOutputBuffer();
+        log.debug("--- writer init got {}", buffer);
+        while (buffer.outDocsIndex != ExportBuffers.Buffer.NO_MORE_DOCS) {
+          if (Thread.currentThread().isInterrupted()) {
+            log.debug("--- writer interrupted");
+            break;
+          }
+          Timer.Context timerContext = writeOutputBufferTimer.time();
+          try {
+            for (int i = buffer.outDocsIndex; i >= 0; --i) {
+              // we're using the raw writer here because there's no potential
+              // reduction in the number of output items, unlike when using
+              // streaming expressions
+              final SortDoc currentDoc = buffer.outDocs[i];
+              writer.add((MapWriter) ew -> writeDoc(currentDoc, leaves, ew, fieldWriters));
+            }
+          } finally {
+            timerContext.stop();
+          }
+          log.debug("--- writer exchanging from {}", buffer);
+          timerContext = writerWaitTimer.time();
+          try {
+            buffers.exchangeBuffers();
+          } finally {
+            timerContext.stop();
+          }
+          buffer = buffers.getOutputBuffer();
+          log.debug("--- writer got {}", buffer);
+        }
+        return true;
+      });
     }
   }
 
-  private int fillOutDocs(List<LeafReaderContext> leaves, SortDoc sortDoc,
-                          SortQueue sortQueue, SortDoc[] outDocs) throws IOException {
+  void fillOutDocs(List<LeafReaderContext> leaves, SortDoc sortDoc,
+                          SortQueue sortQueue, ExportBuffers.Buffer buffer) throws IOException {
     identifyLowestSortingUnexportedDocs(leaves, sortDoc, sortQueue);
-    return transferBatchToArrayForOutput(sortQueue, outDocs);
+    transferBatchToBufferForOutput(sortQueue, leaves, buffer);
   }
 
   void writeDoc(SortDoc sortDoc,
                           List<LeafReaderContext> leaves,
-                          EntryWriter ew) throws IOException {
-
+                          EntryWriter ew, FieldWriter[] writers) throws IOException {
     int ord = sortDoc.ord;
-    FixedBitSet set = sets[ord];
-    set.clear(sortDoc.docId);
     LeafReaderContext context = leaves.get(ord);
     int fieldIndex = 0;
-    for (FieldWriter fieldWriter : fieldWriters) {
+    for (FieldWriter fieldWriter : writers) {
       if (fieldWriter.write(sortDoc, context.reader(), ew, fieldIndex)) {
         ++fieldIndex;
       }
     }
   }
 
-  protected FieldWriter[] getFieldWriters(String[] fields, SolrIndexSearcher searcher) throws IOException {
+  public FieldWriter[] getFieldWriters(String[] fields, SolrIndexSearcher searcher) throws IOException {
     IndexSchema schema = searcher.getSchema();
     FieldWriter[] writers = new FieldWriter[fields.length];
     for (int i = 0; i < fields.length; i++) {
@@ -612,7 +526,7 @@ public class ExportWriter implements SolrCore.RawWriter, Closeable {
     return writers;
   }
 
-  private SortDoc getSortDoc(SolrIndexSearcher searcher, SortField[] sortFields) throws IOException {
+  SortDoc getSortDoc(SolrIndexSearcher searcher, SortField[] sortFields) throws IOException {
     SortValue[] sortValues = new SortValue[sortFields.length];
     IndexSchema schema = searcher.getSchema();
     for (int i = 0; i < sortFields.length; ++i) {
diff --git a/solr/core/src/java/org/apache/solr/handler/export/ExportWriterStream.java b/solr/core/src/java/org/apache/solr/handler/export/ExportWriterStream.java
new file mode 100644
index 0000000..9ec0f4a
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/handler/export/ExportWriterStream.java
@@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.handler.export;
+
+import java.io.IOException;
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.BrokenBarrierException;
+import java.util.concurrent.TimeoutException;
+
+import com.codahale.metrics.Timer;
+import org.apache.solr.client.solrj.io.Tuple;
+import org.apache.solr.client.solrj.io.comp.ComparatorOrder;
+import org.apache.solr.client.solrj.io.comp.FieldComparator;
+import org.apache.solr.client.solrj.io.comp.MultipleFieldComparator;
+import org.apache.solr.client.solrj.io.comp.StreamComparator;
+import org.apache.solr.client.solrj.io.stream.StreamContext;
+import org.apache.solr.client.solrj.io.stream.TupleStream;
+import org.apache.solr.client.solrj.io.stream.expr.Explanation;
+import org.apache.solr.client.solrj.io.stream.expr.Expressible;
+import org.apache.solr.client.solrj.io.stream.expr.StreamExplanation;
+import org.apache.solr.client.solrj.io.stream.expr.StreamExpression;
+import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionParameter;
+import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
+import org.apache.solr.common.IteratorWriter;
+import org.apache.solr.common.MapWriter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Stream implementation that helps supporting 'expr' streaming in export writer.
+ * <p>Note: this class is made public only to allow access from {@link org.apache.solr.handler.ExportHandler},
+ * it should be treated as an internal detail of implementation.</p>
+ * @lucene.experimental
+ */
+public class ExportWriterStream extends TupleStream implements Expressible {
+  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+  final TupleEntryWriter tupleEntryWriter = new TupleEntryWriter();
+  StreamContext context;
+  StreamComparator streamComparator;
+  int pos = -1;
+  ExportBuffers exportBuffers;
+  ExportBuffers.Buffer buffer;
+  Timer.Context writeOutputTimerContext;
+
+  private static final class TupleEntryWriter implements EntryWriter {
+    Tuple tuple;
+
+    @Override
+    public EntryWriter put(CharSequence k, Object v) throws IOException {
+      if (v instanceof IteratorWriter) {
+        List<Object> lst = new ArrayList<>();
+        ((IteratorWriter)v).toList(lst);
+        v = lst;
+      } else if (v instanceof MapWriter) {
+        Map<String, Object> map = new HashMap<>();
+        ((MapWriter)v).toMap(map);
+        v = map;
+      }
+      tuple.put(k.toString(), v);
+      return this;
+    }
+  }
+
+  public ExportWriterStream(StreamExpression expression, StreamFactory factory) throws IOException {
+    streamComparator = parseComp(factory.getDefaultSort());
+  }
+
+  /**
+   * NOTE: this context must contain an instance of {@link ExportBuffers} under the
+   * {@link ExportBuffers#EXPORT_BUFFERS_KEY} key.
+   */
+  @Override
+  public void setStreamContext(StreamContext context) {
+    this.context = context;
+  }
+
+  @Override
+  public List<TupleStream> children() {
+    return null;
+  }
+
+  private StreamComparator parseComp(String sort) throws IOException {
+
+    String[] sorts = sort.split(",");
+    StreamComparator[] comps = new StreamComparator[sorts.length];
+    for (int i = 0; i < sorts.length; i++) {
+      String s = sorts[i];
+
+      String[] spec = s.trim().split("\\s+"); //This should take into account spaces in the sort spec.
+
+      if (spec.length != 2) {
+        throw new IOException("Invalid sort spec:" + s);
+      }
+
+      String fieldName = spec[0].trim();
+      String order = spec[1].trim();
+
+      comps[i] = new FieldComparator(fieldName, order.equalsIgnoreCase("asc") ? ComparatorOrder.ASCENDING : ComparatorOrder.DESCENDING);
+    }
+
+    if (comps.length > 1) {
+      return new MultipleFieldComparator(comps);
+    } else {
+      return comps[0];
+    }
+  }
+
+  @Override
+  public void open() throws IOException {
+    exportBuffers = (ExportBuffers) context.get(ExportBuffers.EXPORT_BUFFERS_KEY);
+    buffer = exportBuffers.getOutputBuffer();
+  }
+
+  @Override
+  public void close() throws IOException {
+    if (writeOutputTimerContext != null) {
+      writeOutputTimerContext.stop();
+    }
+    exportBuffers = null;
+  }
+
+  @Override
+  public Tuple read() throws IOException {
+    Tuple res = null;
+    if (pos < 0) {
+      if (writeOutputTimerContext != null) {
+        writeOutputTimerContext.stop();
+        writeOutputTimerContext = null;
+      }
+      try {
+        buffer.outDocsIndex = ExportBuffers.Buffer.EMPTY;
+        log.debug("--- ews exchange empty buffer {}", buffer);
+        boolean exchanged = false;
+        while (!exchanged) {
+          Timer.Context timerContext = exportBuffers.getWriterWaitTimer().time();
+          try {
+            exportBuffers.exchangeBuffers();
+            exchanged = true;
+          } catch (TimeoutException e) {
+            log.debug("--- ews timeout loop");
+            if (exportBuffers.isShutDown()) {
+              log.debug("--- ews - the other end is shutdown, returning EOF");
+              res = Tuple.EOF();
+              break;
+            }
+            continue;
+          } catch (InterruptedException e) {
+            log.debug("--- ews interrupted");
+            exportBuffers.error(e);
+            res = Tuple.EXCEPTION(e, true);
+            break;
+          } catch (BrokenBarrierException e) {
+            if (exportBuffers.getError() != null) {
+              res = Tuple.EXCEPTION(exportBuffers.getError(), true);
+            } else {
+              res = Tuple.EXCEPTION(e, true);
+            }
+            break;
+          } finally {
+            timerContext.stop();
+          }
+        }
+      } catch (InterruptedException e) {
+        log.debug("--- ews interrupt");
+        exportBuffers.error(e);
+        res = Tuple.EXCEPTION(e, true);
+      } catch (Exception e) {
+        log.debug("--- ews exception", e);
+        exportBuffers.error(e);
+        res = Tuple.EXCEPTION(e, true);
+      }
+      buffer = exportBuffers.getOutputBuffer();
+      if (buffer == null) {
+        res = Tuple.EOF();
+      }
+      if (buffer.outDocsIndex == ExportBuffers.Buffer.NO_MORE_DOCS) {
+        log.debug("--- ews EOF");
+        res = Tuple.EOF();
+      } else {
+        pos = buffer.outDocsIndex;
+        log.debug("--- ews new pos=" + pos);
+      }
+    }
+    if (pos < 0) {
+      log.debug("--- ews EOF?");
+      res = Tuple.EOF();
+    }
+    if (res != null) {
+      // only errors or EOF assigned result so far
+      if (writeOutputTimerContext != null) {
+        writeOutputTimerContext.stop();
+      }
+      return res;
+    }
+    if (writeOutputTimerContext == null) {
+      writeOutputTimerContext = exportBuffers.getWriteOutputBufferTimer().time();
+    }
+    SortDoc sortDoc = buffer.outDocs[pos];
+    tupleEntryWriter.tuple = new Tuple();
+    exportBuffers.exportWriter.writeDoc(sortDoc, exportBuffers.leaves, tupleEntryWriter, exportBuffers.exportWriter.fieldWriters);
+    pos--;
+    return tupleEntryWriter.tuple;
+  }
+
+  @Override
+  public StreamComparator getStreamSort() {
+    return streamComparator;
+  }
+
+  @Override
+  public StreamExpressionParameter toExpression(StreamFactory factory) throws IOException {
+    StreamExpression expression = new StreamExpression(factory.getFunctionName(this.getClass()));
+    return expression;
+  }
+
+  @Override
+  public Explanation toExplanation(StreamFactory factory) throws IOException {
+    return new StreamExplanation(getStreamNodeId().toString())
+        .withFunctionName("input")
+        .withImplementingClass(this.getClass().getName())
+        .withExpressionType(Explanation.ExpressionType.STREAM_SOURCE)
+        .withExpression("--non-expressible--");
+  }
+}
diff --git a/solr/core/src/java/org/apache/solr/handler/export/LongValue.java b/solr/core/src/java/org/apache/solr/handler/export/LongValue.java
index dce365b..63794ad 100644
--- a/solr/core/src/java/org/apache/solr/handler/export/LongValue.java
+++ b/solr/core/src/java/org/apache/solr/handler/export/LongValue.java
@@ -25,10 +25,10 @@ import org.apache.lucene.index.NumericDocValues;
 
 public class LongValue implements SortValue {
 
+  final protected String field;
+  final protected LongComp comp;
   protected NumericDocValues vals;
-  protected String field;
   protected long currentValue;
-  protected LongComp comp;
   private int lastDocID;
   private boolean present;
 
diff --git a/solr/core/src/java/org/apache/solr/handler/export/PriorityQueue.java b/solr/core/src/java/org/apache/solr/handler/export/PriorityQueue.java
deleted file mode 100644
index 1552060..0000000
--- a/solr/core/src/java/org/apache/solr/handler/export/PriorityQueue.java
+++ /dev/null
@@ -1,218 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.solr.handler.export;
-
-import org.apache.lucene.util.ArrayUtil;
-
-public abstract class PriorityQueue<T> {
-  protected int size = 0;
-  protected final int maxSize;
-  private final T[] heap;
-
-  public PriorityQueue(int maxSize) {
-    this(maxSize, true);
-  }
-
-  public PriorityQueue(int maxSize, boolean prepopulate) {
-    final int heapSize;
-    if (0 == maxSize) {
-      // We allocate 1 extra to avoid if statement in top()
-      heapSize = 2;
-    } else {
-      if (maxSize > ArrayUtil.MAX_ARRAY_LENGTH) {
-        // Don't wrap heapSize to -1, in this case, which
-        // causes a confusing NegativeArraySizeException.
-        // Note that very likely this will simply then hit
-        // an OOME, but at least that's more indicative to
-        // caller that this values is too big.  We don't +1
-        // in this case, but it's very unlikely in practice
-        // one will actually insert this many objects into
-        // the PQ:
-        // Throw exception to prevent confusing OOME:
-        throw new IllegalArgumentException("maxSize must be <= " + ArrayUtil.MAX_ARRAY_LENGTH + "; got: " + maxSize);
-      } else {
-        // NOTE: we add +1 because all access to heap is
-        // 1-based not 0-based.  heap[0] is unused.
-        heapSize = maxSize + 1;
-      }
-    }
-    // T is unbounded type, so this unchecked cast works always:
-    @SuppressWarnings("unchecked") final T[] h = (T[]) new Object[heapSize];
-    this.heap = h;
-    this.maxSize = maxSize;
-
-    if (prepopulate) {
-      // If sentinel objects are supported, populate the queue with them
-      T sentinel = getSentinelObject();
-      if (sentinel != null) {
-        heap[1] = sentinel;
-        for (int i = 2; i < heap.length; i++) {
-          heap[i] = getSentinelObject();
-        }
-        size = maxSize;
-      }
-    }
-  }
-
-  /** Determines the ordering of objects in this priority queue.  Subclasses
-   *  must define this one method.
-   *  @return <code>true</code> iff parameter <tt>a</tt> is less than parameter <tt>b</tt>.
-   */
-  protected abstract boolean lessThan(T a, T b);
-
-
-  protected T getSentinelObject() {
-    return null;
-  }
-
-  /**
-   * Adds an Object to a PriorityQueue in log(size) time. If one tries to add
-   * more objects than maxSize from initialize an
-   *
-   * @return the new 'top' element in the queue.
-   */
-  public final T add(T element) {
-    size++;
-    heap[size] = element;
-    upHeap();
-    return heap[1];
-  }
-
-  /**
-   * Adds an Object to a PriorityQueue in log(size) time.
-   * It returns the object (if any) that was
-   * dropped off the heap because it was full. This can be
-   * the given parameter (in case it is smaller than the
-   * full heap's minimum, and couldn't be added), or another
-   * object that was previously the smallest value in the
-   * heap and now has been replaced by a larger one, or null
-   * if the queue wasn't yet full with maxSize elements.
-   */
-  public T insertWithOverflow(T element) {
-    if (size < maxSize) {
-      add(element);
-      return null;
-    } else if (size > 0 && !lessThan(element, heap[1])) {
-      T ret = heap[1];
-      heap[1] = element;
-      updateTop();
-      return ret;
-    } else {
-      return element;
-    }
-  }
-
-  /** Returns the least element of the PriorityQueue in constant time. */
-  public final T top() {
-    // We don't need to check size here: if maxSize is 0,
-    // then heap is length 2 array with both entries null.
-    // If size is 0 then heap[1] is already null.
-    return heap[1];
-  }
-
-  /** Removes and returns the least element of the PriorityQueue in log(size)
-   time. */
-  public final T pop() {
-    if (size > 0) {
-      T result = heap[1];       // save first value
-      heap[1] = heap[size];     // move last to first
-      heap[size] = null;        // permit GC of objects
-      size--;
-      downHeap();               // adjust heap
-      return result;
-    } else {
-      return null;
-    }
-  }
-
-  /**
-   * Should be called when the Object at top changes values. Still log(n) worst
-   * case, but it's at least twice as fast to
-   *
-   * <pre class="prettyprint">
-   * pq.top().change();
-   * pq.updateTop();
-   * </pre>
-   *
-   * instead of
-   *
-   * <pre class="prettyprint">
-   * o = pq.pop();
-   * o.change();
-   * pq.push(o);
-   * </pre>
-   *
-   * @return the new 'top' element.
-   */
-  public final T updateTop() {
-    downHeap();
-    return heap[1];
-  }
-
-  /** Returns the number of elements currently stored in the PriorityQueue. */
-  public final int size() {
-    return size;
-  }
-
-  /** Removes all entries from the PriorityQueue. */
-  public final void clear() {
-    for (int i = 0; i <= size; i++) {
-      heap[i] = null;
-    }
-    size = 0;
-  }
-
-  private final void upHeap() {
-    int i = size;
-    T node = heap[i];          // save bottom node
-    int j = i >>> 1;
-    while (j > 0 && lessThan(node, heap[j])) {
-      heap[i] = heap[j];       // shift parents down
-      i = j;
-      j = j >>> 1;
-    }
-    heap[i] = node;            // install saved node
-  }
-
-  private final void downHeap() {
-    int i = 1;
-    T node = heap[i];          // save top node
-    int j = i << 1;            // find smaller child
-    int k = j + 1;
-    if (k <= size && lessThan(heap[k], heap[j])) {
-      j = k;
-    }
-    while (j <= size && lessThan(heap[j], node)) {
-      heap[i] = heap[j];       // shift up child
-      i = j;
-      j = i << 1;
-      k = j + 1;
-      if (k <= size && lessThan(heap[k], heap[j])) {
-        j = k;
-      }
-    }
-    heap[i] = node;            // install saved node
-  }
-
-  /** This method returns the internal heap array as Object[].
-   * @lucene.internal
-   */
-  public final Object[] getHeapArray() {
-    return (Object[]) heap;
-  }
-}
diff --git a/solr/core/src/java/org/apache/solr/handler/export/QuadValueSortDoc.java b/solr/core/src/java/org/apache/solr/handler/export/QuadValueSortDoc.java
index e4f5a02..b8599c7 100644
--- a/solr/core/src/java/org/apache/solr/handler/export/QuadValueSortDoc.java
+++ b/solr/core/src/java/org/apache/solr/handler/export/QuadValueSortDoc.java
@@ -50,6 +50,7 @@ class QuadValueSortDoc extends TripleValueSortDoc {
   public void reset() {
     this.docId = -1;
     this.docBase = -1;
+    this.ord = -1;
     value1.reset();
     value2.reset();
     value3.reset();
diff --git a/solr/core/src/java/org/apache/solr/handler/export/SingleValueSortDoc.java b/solr/core/src/java/org/apache/solr/handler/export/SingleValueSortDoc.java
index 164c07b..0bd3a4a 100644
--- a/solr/core/src/java/org/apache/solr/handler/export/SingleValueSortDoc.java
+++ b/solr/core/src/java/org/apache/solr/handler/export/SingleValueSortDoc.java
@@ -32,11 +32,6 @@ class SingleValueSortDoc extends SortDoc {
     return null;
   }
 
-  @Override
-  public SortValue[] getSortValues() {
-    return new SortValue[] { value1 };
-  }
-
   public void setNextReader(LeafReaderContext context) throws IOException {
     this.ord = context.ord;
     this.docBase = context.docBase;
@@ -46,6 +41,7 @@ class SingleValueSortDoc extends SortDoc {
   public void reset() {
     this.docId = -1;
     this.docBase = -1;
+    this.ord = -1;
     this.value1.reset();
   }
 
@@ -88,7 +84,7 @@ class SingleValueSortDoc extends SortDoc {
   }
 
   public String toString() {
-    return docId+":"+value1.toString();
+    return ord + ":" + docBase + ":" + docId + ":val=" + value1.toString();
   }
 
 }
diff --git a/solr/core/src/java/org/apache/solr/handler/export/SortDoc.java b/solr/core/src/java/org/apache/solr/handler/export/SortDoc.java
index 292e795..d893bd1 100644
--- a/solr/core/src/java/org/apache/solr/handler/export/SortDoc.java
+++ b/solr/core/src/java/org/apache/solr/handler/export/SortDoc.java
@@ -45,10 +45,6 @@ class SortDoc {
     return null;
   }
 
-  public SortValue[] getSortValues() {
-    return sortValues;
-  }
-
   public void setNextReader(LeafReaderContext context) throws IOException {
     this.ord = context.ord;
     this.docBase = context.docBase;
@@ -60,6 +56,7 @@ class SortDoc {
   public void reset() {
     this.docId = -1;
     this.docBase = -1;
+    this.ord = -1;
     for (SortValue value : sortValues) {
       value.reset();
     }
@@ -67,7 +64,7 @@ class SortDoc {
 
   public void setValues(int docId) throws IOException {
     this.docId = docId;
-    for(SortValue sortValue : sortValues) {
+    for (SortValue sortValue : sortValues) {
       sortValue.setCurrentValue(docId);
     }
   }
@@ -77,14 +74,14 @@ class SortDoc {
     this.ord = sortDoc.ord;
     this.docBase = sortDoc.docBase;
     SortValue[] vals = sortDoc.sortValues;
-    for(int i=0; i<vals.length; i++) {
+    for (int i = 0; i < vals.length; i++) {
       sortValues[i].setCurrentValue(vals[i]);
     }
   }
 
   public SortDoc copy() {
     SortValue[] svs = new SortValue[sortValues.length];
-    for(int i=0; i<sortValues.length; i++) {
+    for (int i = 0; i < sortValues.length; i++) {
       svs[i] = sortValues[i].copy();
     }
 
@@ -92,12 +89,12 @@ class SortDoc {
   }
 
   public boolean lessThan(Object o) {
-    if(docId == -1) {
+    if (docId == -1) {
       return true;
     }
     SortDoc sd = (SortDoc)o;
     SortValue[] sortValues1 = sd.sortValues;
-    for(int i=0; i<sortValues.length; i++) {
+    for (int i = 0; i < sortValues.length; i++) {
       int comp = sortValues[i].compareTo(sortValues1[i]);
       if (comp < 0) {
         return true;
@@ -105,12 +102,12 @@ class SortDoc {
         return false;
       }
     }
-    return docId+docBase > sd.docId+sd.docBase; //index order
+    return docId + docBase > sd.docId + sd.docBase; //index order
   }
 
   public int compareTo(Object o) {
     SortDoc sd = (SortDoc)o;
-    for (int i=0; i<sortValues.length; i++) {
+    for (int i = 0; i < sortValues.length; i++) {
       int comp = sortValues[i].compareTo(sd.sortValues[i]);
       if (comp != 0) {
         return comp;
@@ -122,8 +119,8 @@ class SortDoc {
 
   public String toString() {
     StringBuilder builder = new StringBuilder();
-    builder.append("docId: ").append(docId).append("; ");
-    for (int i=0; i < sortValues.length; i++) {
+    builder.append(ord).append(':').append(docBase).append(':').append(docId).append("; ");
+    for (int i = 0; i < sortValues.length; i++) {
       builder.append("value").append(i).append(": ").append(sortValues[i]).append(", ");
     }
     return builder.toString();
diff --git a/solr/core/src/java/org/apache/solr/handler/export/SortQueue.java b/solr/core/src/java/org/apache/solr/handler/export/SortQueue.java
index cf99d53..3189879 100644
--- a/solr/core/src/java/org/apache/solr/handler/export/SortQueue.java
+++ b/solr/core/src/java/org/apache/solr/handler/export/SortQueue.java
@@ -17,22 +17,53 @@
 
 package org.apache.solr.handler.export;
 
-class SortQueue extends PriorityQueue<SortDoc> {
+import org.apache.lucene.util.ArrayUtil;
 
-  private SortDoc proto;
-  private Object[] cache;
+/**
+ * Specialized class that reuses most of the code from {@link org.apache.lucene.util.PriorityQueue}
+ * but contains optimizations for the /export handler use.
+ */
+final class SortQueue {
+
+  protected int size = 0;
+  protected final int maxSize;
+  private final SortDoc[] heap;
+  private final SortDoc proto;
+  private SortDoc[] cache;
 
-  public SortQueue(int len, SortDoc proto) {
-    super(len);
+  public SortQueue(int maxSize, SortDoc proto) {
     this.proto = proto;
+    final int heapSize;
+    if (0 == maxSize) {
+      // We allocate 1 extra to avoid if statement in top()
+      heapSize = 2;
+    } else {
+      if (maxSize > ArrayUtil.MAX_ARRAY_LENGTH) {
+        // Don't wrap heapSize to -1, in this case, which
+        // causes a confusing NegativeArraySizeException.
+        // Note that very likely this will simply then hit
+        // an OOME, but at least that's more indicative to
+        // caller that this values is too big.  We don't +1
+        // in this case, but it's very unlikely in practice
+        // one will actually insert this many objects into
+        // the PQ:
+        // Throw exception to prevent confusing OOME:
+        throw new IllegalArgumentException("maxSize must be <= " + ArrayUtil.MAX_ARRAY_LENGTH + "; got: " + maxSize);
+      } else {
+        // NOTE: we add +1 because all access to heap is
+        // 1-based not 0-based.  heap[0] is unused.
+        heapSize = maxSize + 1;
+      }
+    }
+    this.heap = new SortDoc[heapSize];
+    this.maxSize = maxSize;
   }
 
-  protected boolean lessThan(SortDoc t1, SortDoc t2) {
+  private static final boolean lessThan(SortDoc t1, SortDoc t2) {
     return t1.lessThan(t2);
   }
 
   protected void populate() {
-    Object[] heap = getHeapArray();
     cache = new SortDoc[heap.length];
     for (int i = 1; i < heap.length; i++) {
       cache[i] = heap[i] = proto.copy();
@@ -41,12 +72,118 @@ class SortQueue extends PriorityQueue<SortDoc> {
   }
 
   protected void reset() {
-    Object[] heap = getHeapArray();
-    if(cache != null) {
+    if (cache != null) {
       System.arraycopy(cache, 1, heap, 1, heap.length-1);
       size = maxSize;
     } else {
       populate();
     }
   }
+
+  // ==================
+  /**
+   * Adds an Object to a PriorityQueue in log(size) time. If one tries to add
+   * more objects than maxSize from initialize an
+   *
+   * @return the new 'top' element in the queue.
+   */
+  public final SortDoc add(SortDoc element) {
+    size++;
+    heap[size] = element;
+    upHeap();
+    return heap[1];
+  }
+
+  /** Returns the least element of the PriorityQueue in constant time. */
+  public final SortDoc top() {
+    // We don't need to check size here: if maxSize is 0,
+    // then heap is length 2 array with both entries null.
+    // If size is 0 then heap[1] is already null.
+    return heap[1];
+  }
+
+  /** Removes and returns the least element of the PriorityQueue in log(size)
+   time. */
+  public final SortDoc pop() {
+    if (size > 0) {
+      SortDoc result = heap[1];       // save first value
+      heap[1] = heap[size];     // move last to first
+      heap[size] = null;        // permit GC of objects
+      size--;
+      downHeap();               // adjust heap
+      return result;
+    } else {
+      return null;
+    }
+  }
+
+  /**
+   * Should be called when the Object at top changes values. Still log(n) worst
+   * case, but it's at least twice as fast to
+   *
+   * <pre class="prettyprint">
+   * pq.top().change();
+   * pq.updateTop();
+   * </pre>
+   *
+   * instead of
+   *
+   * <pre class="prettyprint">
+   * o = pq.pop();
+   * o.change();
+   * pq.push(o);
+   * </pre>
+   *
+   * @return the new 'top' element.
+   */
+  public final SortDoc updateTop() {
+    downHeap();
+    return heap[1];
+  }
+
+  /** Returns the number of elements currently stored in the PriorityQueue. */
+  public final int size() {
+    return size;
+  }
+
+  /** Removes all entries from the PriorityQueue. */
+  public final void clear() {
+    for (int i = 0; i <= size; i++) {
+      heap[i] = null;
+    }
+    size = 0;
+  }
+
+  private final void upHeap() {
+    int i = size;
+    SortDoc node = heap[i];          // save bottom node
+    int j = i >>> 1;
+    while (j > 0 && lessThan(node, heap[j])) {
+      heap[i] = heap[j];       // shift parents down
+      i = j;
+      j = j >>> 1;
+    }
+    heap[i] = node;            // install saved node
+  }
+
+  private final void downHeap() {
+    int i = 1;
+    SortDoc node = heap[i];          // save top node
+    int j = i << 1;            // find smaller child
+    int k = j + 1;
+    if (k <= size && lessThan(heap[k], heap[j])) {
+      j = k;
+    }
+    while (j <= size && lessThan(heap[j], node)) {
+      heap[i] = heap[j];       // shift up child
+      i = j;
+      j = i << 1;
+      k = j + 1;
+      if (k <= size && lessThan(heap[k], heap[j])) {
+        j = k;
+      }
+    }
+    heap[i] = node;            // install saved node
+  }
+
 }
\ No newline at end of file
diff --git a/solr/core/src/java/org/apache/solr/handler/export/StringFieldWriter.java b/solr/core/src/java/org/apache/solr/handler/export/StringFieldWriter.java
index b82c365..846eb08 100644
--- a/solr/core/src/java/org/apache/solr/handler/export/StringFieldWriter.java
+++ b/solr/core/src/java/org/apache/solr/handler/export/StringFieldWriter.java
@@ -18,6 +18,8 @@
 package org.apache.solr.handler.export;
 
 import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
 
 import org.apache.lucene.index.DocValues;
 import org.apache.lucene.index.LeafReader;
@@ -32,6 +34,7 @@ import org.apache.solr.schema.FieldType;
 class StringFieldWriter extends FieldWriter {
   private String field;
   private FieldType fieldType;
+  private Map<Integer, SortedDocValues> lastDocValues = new HashMap<>();
   private CharsRefBuilder cref = new CharsRefBuilder();
   final ByteArrayUtf8CharSequence utf8 = new ByteArrayUtf8CharSequence(new byte[0], 0, 0) {
     @Override
@@ -61,7 +64,11 @@ class StringFieldWriter extends FieldWriter {
       }
     } else {
       // field is not part of 'sort' param, but part of 'fl' param
-      SortedDocValues vals = DocValues.getSorted(reader, this.field);
+      SortedDocValues vals = lastDocValues.get(sortDoc.ord);
+      if (vals == null || vals.docID() >= sortDoc.docId) {
+        vals = DocValues.getSorted(reader, this.field);
+        lastDocValues.put(sortDoc.ord, vals);
+      }
       if (vals.advance(sortDoc.docId) != sortDoc.docId) {
         return false;
       }
@@ -73,9 +80,9 @@ class StringFieldWriter extends FieldWriter {
       ew.put(this.field, utf8.reset(ref.bytes, ref.offset, ref.length, null));
     } else {
       String v = null;
-      if(sortValue != null) {
+      if (sortValue != null) {
         v = ((StringValue) sortValue).getLastString();
-        if(v == null) {
+        if (v == null) {
           fieldType.indexedToReadable(ref, cref);
           v = cref.toString();
           ((StringValue) sortValue).setLastString(v);
diff --git a/solr/core/src/java/org/apache/solr/handler/export/StringValue.java b/solr/core/src/java/org/apache/solr/handler/export/StringValue.java
index fc70565..e3a36d4 100644
--- a/solr/core/src/java/org/apache/solr/handler/export/StringValue.java
+++ b/solr/core/src/java/org/apache/solr/handler/export/StringValue.java
@@ -29,15 +29,16 @@ import org.apache.lucene.util.LongValues;
 
 class StringValue implements SortValue {
 
-  protected SortedDocValues globalDocValues;
+  private final SortedDocValues globalDocValues;
+
+  private final OrdinalMap ordinalMap;
+  private final String field;
+  private final IntComp comp;
 
-  protected OrdinalMap ordinalMap;
   protected LongValues toGlobal = LongValues.IDENTITY; // this segment to global ordinal. NN;
   protected SortedDocValues docValues;
 
-  protected String field;
   protected int currentOrd;
-  protected IntComp comp;
   protected int lastDocID;
   private boolean present;
 
@@ -50,6 +51,8 @@ class StringValue implements SortValue {
     this.docValues = globalDocValues;
     if (globalDocValues instanceof MultiDocValues.MultiSortedDocValues) {
       this.ordinalMap = ((MultiDocValues.MultiSortedDocValues) globalDocValues).mapping;
+    } else {
+      this.ordinalMap = null;
     }
     this.field = field;
     this.comp = comp;
@@ -66,7 +69,8 @@ class StringValue implements SortValue {
   }
 
   public StringValue copy() {
-    return new StringValue(globalDocValues, field, comp);
+    StringValue copy = new StringValue(globalDocValues, field, comp);
+    return copy;
   }
 
   public void setCurrentValue(int docId) throws IOException {
diff --git a/solr/core/src/java/org/apache/solr/handler/export/TripleValueSortDoc.java b/solr/core/src/java/org/apache/solr/handler/export/TripleValueSortDoc.java
index 3e8bfd8..c68c1a8 100644
--- a/solr/core/src/java/org/apache/solr/handler/export/TripleValueSortDoc.java
+++ b/solr/core/src/java/org/apache/solr/handler/export/TripleValueSortDoc.java
@@ -47,6 +47,7 @@ class TripleValueSortDoc extends DoubleValueSortDoc {
   public void reset() {
     this.docId = -1;
     this.docBase = -1;
+    this.ord = -1;
     value1.reset();
     value2.reset();
     value3.reset();
diff --git a/solr/core/src/test/org/apache/solr/handler/export/TestExportWriter.java b/solr/core/src/test/org/apache/solr/handler/export/TestExportWriter.java
index 2d51647..935a364 100644
--- a/solr/core/src/test/org/apache/solr/handler/export/TestExportWriter.java
+++ b/solr/core/src/test/org/apache/solr/handler/export/TestExportWriter.java
@@ -32,6 +32,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.lucene.util.TestUtil;
 import org.apache.solr.SolrTestCaseJ4;
 import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.params.StreamParams;
 import org.apache.solr.common.util.SuppressForbidden;
 import org.apache.solr.common.util.Utils;
 import org.apache.solr.index.LogDocMergePolicyFactory;
@@ -709,8 +710,8 @@ public class TestExportWriter extends SolrTestCaseJ4 {
   }
 
   private void createLargeIndex() throws Exception {
-    int BATCH_SIZE = 1000;
-    int NUM_BATCHES = 100;
+    int BATCH_SIZE = 5000;
+    int NUM_BATCHES = 20;
     SolrInputDocument[] docs = new SolrInputDocument[BATCH_SIZE];
     for (int i = 0; i < NUM_BATCHES; i++) {
       for (int j = 0; j < BATCH_SIZE; j++) {
@@ -720,7 +721,7 @@ public class TestExportWriter extends SolrTestCaseJ4 {
             "random_i_p", String.valueOf(random().nextInt(BATCH_SIZE)),
             "sortabledv", TestUtil.randomSimpleString(random(), 2, 3),
             "sortabledv_udvas", String.valueOf(random().nextInt(100)),
-            "small_i_p", String.valueOf((i + j) % 7)
+            "small_i_p", String.valueOf((i + j) % 37)
             );
       }
       updateJ(jsonAdd(docs), null);
@@ -758,6 +759,30 @@ public class TestExportWriter extends SolrTestCaseJ4 {
       }
       assertTrue("missing value " + i + " in results", found);
     }
+    req = req("q", "*:*", "qt", "/export", "fl", "id,sortabledv_udvas,small_i_p", "sort", "sortabledv_udvas asc", "expr", "rollup(input(),over=\"sortabledv_udvas\", sum(small_i_p),avg(small_i_p),min(small_i_p),count(*))");
+    rsp = h.query(req);
+    rspMap = mapper.readValue(rsp, HashMap.class);
+    docs = (List<Map<String, Object>>) Utils.getObjectByPath(rspMap, false, "/response/docs");
+    assertNotNull("missing document results: " + rspMap, docs);
+    assertEquals("wrong number of unique docs", 100, docs.size());
+    for (Map<String, Object> doc : docs) {
+      assertNotNull("missing sum: " + doc, doc.get("sum(small_i_p)"));
+      assertEquals(18000.0, ((Number)doc.get("sum(small_i_p)")).doubleValue(), 2500.0);
+      assertNotNull("missing avg: " + doc, doc.get("avg(small_i_p)"));
+      assertEquals(18.0, ((Number)doc.get("avg(small_i_p)")).doubleValue(), 2.5);
+      assertNotNull("missing count: " + doc, doc.get("count(*)"));
+      assertEquals(1000.0, ((Number)doc.get("count(*)")).doubleValue(), 200.0);
+    }
+    // try invalid field types
+    req = req("q", "*:*", "qt", "/export", "fl", "id,sortabledv,small_i_p", "sort", "sortabledv asc", "expr", "unique(input(),over=\"sortabledv\")");
+    rsp = h.query(req);
+    rspMap = mapper.readValue(rsp, HashMap.class);
+    assertEquals("wrong response status", 400, ((Number)Utils.getObjectByPath(rspMap, false, "/responseHeader/status")).intValue());
+    docs = (List<Map<String, Object>>) Utils.getObjectByPath(rspMap, false, "/response/docs");
+    assertEquals("wrong number of docs", 1, docs.size());
+    Map<String, Object> doc = docs.get(0);
+    assertTrue("doc doesn't have exception", doc.containsKey(StreamParams.EXCEPTION));
+    assertTrue("wrong exception message", doc.get(StreamParams.EXCEPTION).toString().contains("Must have useDocValuesAsStored='true'"));
   }
 
   private void validateSort(int numDocs) throws Exception {
diff --git a/solr/solr-ref-guide/src/exporting-result-sets.adoc b/solr/solr-ref-guide/src/exporting-result-sets.adoc
index 8a072f2..674ab96 100644
--- a/solr/solr-ref-guide/src/exporting-result-sets.adoc
+++ b/solr/solr-ref-guide/src/exporting-result-sets.adoc
@@ -39,6 +39,8 @@ You can use `/export` to make requests to export the result set of a query.
 
 All queries must include `sort` and `fl` parameters, or the query will return an error. Filter queries are also supported.
 
+Optional parameter `batchSize` determines the size of the internal buffers for partial results. The default value is 30000 but users may want to specify smaller values to limit the memory use (at the cost of degraded performance) or higher values to improve export performance (the relationship is not linear and larger values don't bring proportionally larger performance increases).
+
 The supported response writers are `json` and `javabin`. For backward compatibility reasons `wt=xsort` is also supported as input, but `wt=xsort` behaves same as `wt=json`. The default output format is `json`.
 
 Here is an example of an export request of some indexed log data:
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/Tuple.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/Tuple.java
index 5d24406..24e1951 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/Tuple.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/Tuple.java
@@ -17,6 +17,8 @@
 package org.apache.solr.client.solrj.io;
 
 import java.io.IOException;
+import java.io.PrintWriter;
+import java.io.StringWriter;
 import java.time.Instant;
 import java.util.ArrayList;
 import java.util.Date;
@@ -94,6 +96,10 @@ public class Tuple implements Cloneable, MapWriter {
       throw new RuntimeException("must have a matching number of key-value pairs");
     }
     for (int i = 0; i < fields.length; i += 2) {
+      // skip empty entries
+      if (fields[i] == null) {
+        continue;
+      }
       put(fields[i], fields[i + 1]);
     }
   }
@@ -330,4 +336,15 @@ public class Tuple implements Cloneable, MapWriter {
     }
     return tuple;
   }
+
+  /**
+   * Create a new empty tuple marked as EXCEPTION and optionally EOF.
+   * @param t exception - full stack trace will be used as an exception message
+   * @param eof if true the tuple will be marked as EOF
+   */
+  public static Tuple EXCEPTION(Throwable t, boolean eof) {
+    StringWriter sw = new StringWriter();
+    t.printStackTrace(new PrintWriter(sw));
+    return EXCEPTION(sw.toString(), eof);
+  }
 }