You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ab...@apache.org on 2020/06/15 08:56:44 UTC

[lucene-solr] branch jira/solr-14537 updated: SOLR-14537: Refactor for clarity - move some inner classes to top level.

This is an automated email from the ASF dual-hosted git repository.

ab pushed a commit to branch jira/solr-14537
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git


The following commit(s) were added to refs/heads/jira/solr-14537 by this push:
     new ea97e7f  SOLR-14537: Refactor for clarity - move some inner classes to top level.
ea97e7f is described below

commit ea97e7f9d23227dfe5e0afef0c61ece4918dca0c
Author: Andrzej Bialecki <ab...@apache.org>
AuthorDate: Mon Jun 15 10:55:47 2020 +0200

    SOLR-14537: Refactor for clarity - move some inner classes to top level.
---
 .../org/apache/solr/handler/ExportHandler.java     |   3 +-
 .../apache/solr/handler/export/ExportBuffers.java  | 259 +++++++++++++++
 .../apache/solr/handler/export/ExportWriter.java   | 350 ++-------------------
 .../solr/handler/export/ExportWriterStream.java    | 157 +++++++++
 4 files changed, 437 insertions(+), 332 deletions(-)

diff --git a/solr/core/src/java/org/apache/solr/handler/ExportHandler.java b/solr/core/src/java/org/apache/solr/handler/ExportHandler.java
index 04800a3..a89eeae 100644
--- a/solr/core/src/java/org/apache/solr/handler/ExportHandler.java
+++ b/solr/core/src/java/org/apache/solr/handler/ExportHandler.java
@@ -34,6 +34,7 @@ import org.apache.solr.core.CoreContainer;
 import org.apache.solr.core.SolrCore;
 import org.apache.solr.handler.component.SearchHandler;
 import org.apache.solr.handler.export.ExportWriter;
+import org.apache.solr.handler.export.ExportWriterStream;
 import org.apache.solr.request.SolrQueryRequest;
 import org.apache.solr.response.SolrQueryResponse;
 import org.slf4j.Logger;
@@ -66,7 +67,7 @@ public class ExportHandler extends SearchHandler {
       for (String function : forbiddenStreams) {
         this.withoutFunctionName(function);
       }
-      this.withFunctionName("input", ExportWriter.ExportWriterStream.class);
+      this.withFunctionName("input", ExportWriterStream.class);
     }
   }
 
diff --git a/solr/core/src/java/org/apache/solr/handler/export/ExportBuffers.java b/solr/core/src/java/org/apache/solr/handler/export/ExportBuffers.java
new file mode 100644
index 0000000..5525f81
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/handler/export/ExportBuffers.java
@@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.handler.export;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Exchanger;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.LongAdder;
+
+import org.apache.lucene.index.LeafReaderContext;
+import org.apache.lucene.search.Sort;
+import org.apache.solr.common.IteratorWriter;
+import org.apache.solr.common.MapWriter;
+import org.apache.solr.common.util.ExecutorUtil;
+import org.apache.solr.common.util.SolrNamedThreadFactory;
+import org.apache.solr.search.SolrIndexSearcher;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Helper class implementing a "double buffering" producer / consumer.
+ */
+class ExportBuffers {
+  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+  static final long EXCHANGE_TIMEOUT_SECONDS = 600;
+  static final String EXPORT_BUFFERS_KEY = "__eb__";
+
+  final Buffer bufferOne;
+  final Buffer bufferTwo;
+  final Exchanger<Buffer> exchanger = new Exchanger<>();
+  final SortDoc[] outDocs;
+  final List<LeafReaderContext> leaves;
+  final ExportWriter exportWriter;
+  final OutputStream os;
+  final IteratorWriter.ItemWriter rawWriter;
+  final IteratorWriter.ItemWriter writer;
+  final int totalHits;
+  Buffer fillBuffer;
+  Buffer outputBuffer;
+  Runnable filler;
+  ExecutorService service;
+  LongAdder outputCounter = new LongAdder();
+
+  ExportBuffers(ExportWriter exportWriter, List<LeafReaderContext> leaves, SolrIndexSearcher searcher,
+                OutputStream os, IteratorWriter.ItemWriter rawWriter, Sort sort, int queueSize, int totalHits) {
+    this.exportWriter = exportWriter;
+    this.leaves = leaves;
+    this.os = os;
+    this.rawWriter = rawWriter;
+    this.writer = new IteratorWriter.ItemWriter() {
+      @Override
+      public IteratorWriter.ItemWriter add(Object o) throws IOException {
+        rawWriter.add(o);
+        outputCounter.increment();
+        return this;
+      }
+    };
+    this.outDocs = new SortDoc[queueSize];
+    this.bufferOne = new Buffer();
+    this.bufferTwo = new Buffer();
+    this.totalHits = totalHits;
+    fillBuffer = bufferOne;
+    outputBuffer = bufferTwo;
+    filler = () -> {
+      try {
+        SortDoc sortDoc = exportWriter.getSortDoc(searcher, sort.getSort());
+        SortQueue queue = new SortQueue(queueSize, sortDoc);
+        long lastOutputCounter = 0;
+        for (int count = 0; count < totalHits; ) {
+          log.info("--- filler fillOutDocs in " + fillBuffer);
+          exportWriter.fillOutDocs(leaves, sortDoc, queue, outDocs, fillBuffer);
+          count += (fillBuffer.outDocsIndex + 1);
+          log.info("--- filler count=" + count + ", exchange buffer from " + fillBuffer);
+          fillBuffer = exchange(fillBuffer);
+          if (outputCounter.longValue() > lastOutputCounter) {
+            lastOutputCounter = outputCounter.longValue();
+            flushOutput();
+          }
+          log.info("--- filler got empty buffer " + fillBuffer);
+        }
+        fillBuffer.outDocsIndex = Buffer.NO_MORE_DOCS;
+        log.info("--- filler final exchange buffer from " + fillBuffer);
+        fillBuffer = exchange(fillBuffer);
+        log.info("--- filler final got buffer " + fillBuffer);
+      } catch (Exception e) {
+        log.error("filler", e);
+        shutdownNow();
+      }
+    };
+  }
+
+  private void flushOutput() throws IOException {
+    os.flush();
+  }
+
+  // initial output buffer
+  public Buffer getOutputBuffer() {
+    return outputBuffer;
+  }
+
+  // decorated writer that keeps track of number of writes
+  public IteratorWriter.ItemWriter getWriter() {
+    return writer;
+  }
+
+  public Buffer exchange(Buffer buffer) throws InterruptedException, TimeoutException {
+    return exchanger.exchange(buffer, EXCHANGE_TIMEOUT_SECONDS, TimeUnit.SECONDS);
+  }
+
+  public void shutdownNow() {
+    if (service != null) {
+      log.info("--- shutting down buffers");
+      service.shutdownNow();
+    }
+  }
+
+  /**
+   * Start processing and block until complete or Exception is thrown.
+   *
+   * @param writer writer that exchanges and processes buffers received from a producer.
+   * @throws IOException on errors
+   */
+  public void run(Callable<Boolean> writer) throws IOException {
+    service = ExecutorUtil.newMDCAwareFixedThreadPool(1, new SolrNamedThreadFactory("ExportBuffers"));
+    try {
+      CompletableFuture.runAsync(filler, service);
+      writer.call();
+
+      // alternatively we could run the writer in a separate thread:
+//        CompletableFuture<Void> allDone = CompletableFuture.allOf(
+//            CompletableFuture.runAsync(filler, service),
+//            CompletableFuture.runAsync(() -> {
+//              try {
+//                writer.call();
+//              } catch (Exception e) {
+//                log.error("writer", e);
+//                shutdownNow();
+//              }
+//            }, service)
+//        );
+//        allDone.join();
+      log.info("-- finished.");
+    } catch (Exception e) {
+      log.error("Exception running filler / writer", e);
+      //
+    } finally {
+      log.info("--- all done, shutting down buffers");
+      service.shutdownNow();
+    }
+  }
+
+  public static final class Buffer implements MapWriter.EntryWriter {
+    static final int EMPTY = -1;
+    static final int NO_MORE_DOCS = -2;
+
+    int outDocsIndex = EMPTY;
+    // use array-of-arrays instead of Map to conserve space
+    Object[][] outDocs;
+    int pos = EMPTY;
+
+    MapWriter.EntryWriter getEntryWriter(int pos, int numFields) {
+      if (outDocs == null) {
+        outDocs = new Object[outDocsIndex + 1][];
+      }
+      this.pos = pos;
+      Object[] fields = new Object[numFields << 1];
+      outDocs[pos] = fields;
+      return this;
+    }
+
+    @Override
+    public MapWriter.EntryWriter put(CharSequence k, Object v) throws IOException {
+      if (pos < 0) {
+        throw new IOException("Invalid entry position");
+      }
+      Object[] fields = outDocs[pos];
+      boolean putOk = false;
+      for (int i = 0; i < fields.length; i += 2) {
+        if (fields[i] == null || fields[i].equals(k)) {
+          fields[i] = k;
+          // convert everything complex into POJOs at this point
+          // to avoid accessing docValues or termEnums from other threads
+          if (v instanceof IteratorWriter) {
+            List lst = new ArrayList();
+            ((IteratorWriter)v).toList(lst);
+            v = lst;
+          } else if (v instanceof MapWriter) {
+            Map<String, Object> map = new HashMap<>();
+            ((MapWriter)v).toMap(map);
+            v = map;
+          }
+          fields[i + 1] = v;
+          putOk = true;
+          break;
+        }
+      }
+      if (!putOk) {
+        throw new IOException("should not happen! pos=" + pos + " ran out of space for field " + k + "=" + v
+            +  " - already full: " + Arrays.toString(fields));
+      }
+      return this;
+    }
+
+    // helper method to make it easier to write our internal key-value array as if it were a map
+    public void writeItem(int pos, IteratorWriter.ItemWriter itemWriter) throws IOException {
+      final Object[] fields = outDocs[pos];
+      if (fields == null) {
+        return;
+      }
+      itemWriter.add((MapWriter) ew -> {
+        for (int i = 0; i < fields.length; i += 2) {
+          if (fields[i] == null) {
+            continue;
+          }
+          ew.put((CharSequence)fields[i], fields[i + 1]);
+        }
+      });
+    }
+
+    public void reset() {
+      outDocsIndex = EMPTY;
+      pos = EMPTY;
+      outDocs = null;
+    }
+
+    @Override
+    public String toString() {
+      return "Buffer@" + Integer.toHexString(hashCode()) + "{" +
+          "outDocsIndex=" + outDocsIndex +
+          '}';
+    }
+  }
+}
diff --git a/solr/core/src/java/org/apache/solr/handler/export/ExportWriter.java b/solr/core/src/java/org/apache/solr/handler/export/ExportWriter.java
index 813db92..36bc40a 100644
--- a/solr/core/src/java/org/apache/solr/handler/export/ExportWriter.java
+++ b/solr/core/src/java/org/apache/solr/handler/export/ExportWriter.java
@@ -24,17 +24,7 @@ import java.io.OutputStreamWriter;
 import java.io.PrintWriter;
 import java.lang.invoke.MethodHandles;
 import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
-import java.util.concurrent.Callable;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Exchanger;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
 
 import org.apache.lucene.index.LeafReader;
 import org.apache.lucene.index.LeafReaderContext;
@@ -46,18 +36,10 @@ import org.apache.lucene.util.BitSetIterator;
 import org.apache.lucene.util.FixedBitSet;
 import org.apache.solr.client.solrj.impl.BinaryResponseParser;
 import org.apache.solr.client.solrj.io.Tuple;
-import org.apache.solr.client.solrj.io.comp.ComparatorOrder;
-import org.apache.solr.client.solrj.io.comp.FieldComparator;
-import org.apache.solr.client.solrj.io.comp.MultipleFieldComparator;
-import org.apache.solr.client.solrj.io.comp.StreamComparator;
 import org.apache.solr.client.solrj.io.stream.StreamContext;
 import org.apache.solr.client.solrj.io.stream.TupleStream;
-import org.apache.solr.client.solrj.io.stream.expr.Explanation;
-import org.apache.solr.client.solrj.io.stream.expr.Expressible;
-import org.apache.solr.client.solrj.io.stream.expr.StreamExplanation;
 import org.apache.solr.client.solrj.io.stream.expr.StreamExpression;
 import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionNamedParameter;
-import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionParameter;
 import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionParser;
 import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
 import org.apache.solr.common.IteratorWriter;
@@ -68,9 +50,7 @@ import org.apache.solr.common.SolrException;
 import org.apache.solr.common.params.CommonParams;
 import org.apache.solr.common.params.SolrParams;
 import org.apache.solr.common.params.StreamParams;
-import org.apache.solr.common.util.ExecutorUtil;
 import org.apache.solr.common.util.JavaBinCodec;
-import org.apache.solr.common.util.SolrNamedThreadFactory;
 import org.apache.solr.core.SolrCore;
 import org.apache.solr.request.SolrQueryRequest;
 import org.apache.solr.request.SolrRequestInfo;
@@ -109,7 +89,7 @@ import static org.apache.solr.common.util.Utils.makeMap;
  * Priority Queue.  They are then exported (written across the wire) and marked as sent (unset in the bitmap).
  * This process repeats until all matching documents have been sent.
  * <p>
- * This streaming approach is light on memory (only {@link #DOCUMENT_BATCH_SIZE} documents are ever stored in memory at
+ * This streaming approach is light on memory (only up to 2x {@link #DOCUMENT_BATCH_SIZE} documents are ever stored in memory at
  * once), and it allows {@link ExportWriter} to scale well with regard to numDocs.
  */
 public class ExportWriter implements SolrCore.RawWriter, Closeable {
@@ -117,10 +97,6 @@ public class ExportWriter implements SolrCore.RawWriter, Closeable {
 
   private static final int DOCUMENT_BATCH_SIZE = 30000;
 
-  private static final String EXPORT_WRITER_KEY = "__ew__";
-  private static final String EXPORT_BUFFERS_KEY = "__eb__";
-  private static final String LEAF_READERS_KEY = "__leaves__";
-
   private OutputStreamWriter respWriter;
   final SolrQueryRequest req;
   final SolrQueryResponse res;
@@ -133,115 +109,6 @@ public class ExportWriter implements SolrCore.RawWriter, Closeable {
   PushWriter writer;
   private String wt;
 
-  public static class ExportWriterStream extends TupleStream implements Expressible {
-    StreamContext context;
-    StreamComparator streamComparator;
-    int pos = -1;
-    ExportBuffers exportBuffers;
-    Buffer buffer;
-
-    public ExportWriterStream(StreamExpression expression, StreamFactory factory) throws IOException {
-      streamComparator = parseComp(factory.getDefaultSort());
-    }
-
-    @Override
-    public void setStreamContext(StreamContext context) {
-      this.context = context;
-    }
-
-    @Override
-    public List<TupleStream> children() {
-      return null;
-    }
-
-    private StreamComparator parseComp(String sort) throws IOException {
-
-      String[] sorts = sort.split(",");
-      StreamComparator[] comps = new StreamComparator[sorts.length];
-      for (int i = 0; i < sorts.length; i++) {
-        String s = sorts[i];
-
-        String[] spec = s.trim().split("\\s+"); //This should take into account spaces in the sort spec.
-
-        if (spec.length != 2) {
-          throw new IOException("Invalid sort spec:" + s);
-        }
-
-        String fieldName = spec[0].trim();
-        String order = spec[1].trim();
-
-        comps[i] = new FieldComparator(fieldName, order.equalsIgnoreCase("asc") ? ComparatorOrder.ASCENDING : ComparatorOrder.DESCENDING);
-      }
-
-      if (comps.length > 1) {
-        return new MultipleFieldComparator(comps);
-      } else {
-        return comps[0];
-      }
-    }
-
-    @Override
-    public void open() throws IOException {
-      exportBuffers = (ExportBuffers) context.get(EXPORT_BUFFERS_KEY);
-      buffer = exportBuffers.getOutputBuffer();
-    }
-
-    @Override
-    public void close() throws IOException {
-      exportBuffers = null;
-    }
-
-    @Override
-    public Tuple read() throws IOException {
-      if (pos < 0) {
-        try {
-          buffer.outDocsIndex = Buffer.EMPTY;
-          log.info("--- ews exchange empty buffer " + buffer);
-          buffer = exportBuffers.exchanger.exchange(buffer);
-          log.info("--- ews got new output buffer " + buffer);
-        } catch (InterruptedException e) {
-          log.info("--- ews interrupt");
-          Thread.currentThread().interrupt();
-          throw new IOException("interrupted");
-        }
-        if (buffer.outDocsIndex == Buffer.NO_MORE_DOCS) {
-          log.info("--- ews EOF");
-          return Tuple.EOF();
-        } else {
-          pos = buffer.outDocsIndex;
-          log.info("--- ews new pos=" + pos);
-        }
-      }
-      if (pos < 0) {
-        log.info("--- ews EOF?");
-        return Tuple.EOF();
-      }
-      Tuple tuple = new Tuple(buffer.outDocs[pos]);
-      pos--;
-      return tuple;
-    }
-
-    @Override
-    public StreamComparator getStreamSort() {
-      return streamComparator;
-    }
-
-    @Override
-    public StreamExpressionParameter toExpression(StreamFactory factory) throws IOException {
-      StreamExpression expression = new StreamExpression(factory.getFunctionName(this.getClass()));
-      return expression;
-    }
-
-    @Override
-    public Explanation toExplanation(StreamFactory factory) throws IOException {
-      return new StreamExplanation(getStreamNodeId().toString())
-          .withFunctionName("input")
-          .withImplementingClass(this.getClass().getName())
-          .withExpressionType(Explanation.ExpressionType.STREAM_SOURCE)
-          .withExpression("--non-expressible--");
-    }
-  }
-
 
   public ExportWriter(SolrQueryRequest req, SolrQueryResponse res, String wt, StreamContext initialStreamContext) {
     this.req = req;
@@ -397,7 +264,7 @@ public class ExportWriter implements SolrCore.RawWriter, Closeable {
       m.put("responseHeader", singletonMap("status", 0));
       m.put("response", (MapWriter) mw -> {
         mw.put("numFound", totalHits);
-        mw.put("docs", (IteratorWriter) iw -> writeDocs(req, iw, sort));
+        mw.put("docs", (IteratorWriter) iw -> writeDocs(req, os, iw, sort));
       });
     });
     if (streamContext != null) {
@@ -432,7 +299,7 @@ public class ExportWriter implements SolrCore.RawWriter, Closeable {
   }
 
   protected void transferBatchToBufferForOutput(SortQueue queue, SortDoc[] outDocs,
-                                                List<LeafReaderContext> leaves, Buffer destination) throws IOException {
+                                                List<LeafReaderContext> leaves, ExportBuffers.Buffer destination) throws IOException {
     int outDocsIndex = -1;
     for (int i = 0; i < queue.maxSize; i++) {
       SortDoc s = queue.pop();
@@ -444,199 +311,14 @@ public class ExportWriter implements SolrCore.RawWriter, Closeable {
     materializeDocs(leaves, outDocs, destination);
   }
 
-  private static final class Buffer implements EntryWriter {
-    static final int EMPTY = -1;
-    static final int NO_MORE_DOCS = -2;
-
-    int outDocsIndex = EMPTY;
-    // use array-of-arrays instead of Map to conserve space
-    Object[][] outDocs;
-    int pos = EMPTY;
-
-    EntryWriter getEntryWriter(int pos, int numFields) {
-      if (outDocs == null) {
-        outDocs = new Object[outDocsIndex + 1][];
-      }
-      this.pos = pos;
-      Object[] fields = new Object[numFields << 1];
-      outDocs[pos] = fields;
-      return this;
-    }
-
-    @Override
-    public EntryWriter put(CharSequence k, Object v) throws IOException {
-      if (pos < 0) {
-        throw new IOException("Invalid entry position");
-      }
-      Object[] fields = outDocs[pos];
-      boolean putOk = false;
-      for (int i = 0; i < fields.length; i += 2) {
-        if (fields[i] == null || fields[i].equals(k)) {
-          fields[i] = k;
-          // convert everything complex into POJOs at this point
-          if (v instanceof IteratorWriter) {
-            List lst = new ArrayList();
-            ((IteratorWriter)v).toList(lst);
-            v = lst;
-          } else if (v instanceof MapWriter) {
-            Map<String, Object> map = new HashMap<>();
-            ((MapWriter)v).toMap(map);
-            v = map;
-          }
-          fields[i + 1] = v;
-          putOk = true;
-          break;
-        }
-      }
-      if (!putOk) {
-        throw new IOException("should not happen! pos=" + pos + " ran out of space for field " + k + "=" + v
-            +  " - already full: " + Arrays.toString(fields));
-      }
-      return this;
-    }
-
-    public void writeItem(int pos, IteratorWriter.ItemWriter itemWriter) throws IOException {
-      final Object[] fields = outDocs[pos];
-      if (fields == null) {
-        log.info("--- writeItem no fields at " + pos);
-        return;
-      }
-      itemWriter.add((MapWriter) ew -> {
-        for (int i = 0; i < fields.length; i += 2) {
-          if (fields[i] == null) {
-            log.info("--- writeItem empty field at " + pos + "/" + i);
-            continue;
-          }
-          log.info("--- writeItem field " + pos + "/" + i);
-          ew.put((CharSequence)fields[i], fields[i + 1]);
-          log.info("--- writeItem done field " + pos + "/" + i);
-        }
-      });
-    }
-
-    public void reset() {
-      outDocsIndex = EMPTY;
-      pos = EMPTY;
-      outDocs = null;
-    }
-
-    @Override
-    public String toString() {
-      return "Buffer@" + Integer.toHexString(hashCode()) + "{" +
-          "outDocsIndex=" + outDocsIndex +
-          '}';
-    }
-  }
-
-  /* Helper class implementing a "double buffering" producer / consumer. */
-  private static final class ExportBuffers {
-    static final long EXCHANGE_TIMEOUT_SECONDS = 300;
-
-    final Buffer bufferOne;
-    final Buffer bufferTwo;
-    final Exchanger<Buffer> exchanger = new Exchanger<>();
-    final SortDoc[] outDocs;
-    final List<LeafReaderContext> leaves;
-    final ExportWriter exportWriter;
-    final int totalHits;
-    Buffer fillBuffer;
-    Buffer outputBuffer;
-    Runnable filler;
-    ExecutorService service;
-
-    ExportBuffers (ExportWriter exportWriter, List<LeafReaderContext> leaves, SolrIndexSearcher searcher, Sort sort, int queueSize, int totalHits) {
-      this.exportWriter = exportWriter;
-      this.leaves = leaves;
-      this.outDocs = new SortDoc[queueSize];
-      this.bufferOne = new Buffer();
-      this.bufferTwo = new Buffer();
-      this.totalHits = totalHits;
-      fillBuffer = bufferOne;
-      outputBuffer = bufferTwo;
-      filler = () -> {
-        try {
-          SortDoc sortDoc = exportWriter.getSortDoc(searcher, sort.getSort());
-          SortQueue queue = new SortQueue(queueSize, sortDoc);
-          for (int count = 0; count < totalHits; ) {
-            log.info("--- filler fillOutDocs in " + fillBuffer);
-            exportWriter.fillOutDocs(leaves, sortDoc, queue, outDocs, fillBuffer);
-            count += (fillBuffer.outDocsIndex + 1);
-            log.info("--- filler count=" + count + ", exchange buffer from " + fillBuffer);
-            fillBuffer = exchanger.exchange(fillBuffer, EXCHANGE_TIMEOUT_SECONDS, TimeUnit.SECONDS);
-            log.info("--- filler got empty buffer " + fillBuffer);
-          }
-          fillBuffer.outDocsIndex = Buffer.NO_MORE_DOCS;
-          log.info("--- filler final exchange buffer from " + fillBuffer);
-          fillBuffer = exchanger.exchange(fillBuffer, EXCHANGE_TIMEOUT_SECONDS, TimeUnit.SECONDS);
-          log.info("--- filler final got buffer " + fillBuffer);
-        } catch (Exception e) {
-          log.error("filler", e);
-          shutdownNow();
-        }
-      };
-    }
-
-    public Buffer getOutputBuffer() {
-      return outputBuffer;
-    }
-
-    public Buffer exchange(Buffer buffer) throws InterruptedException, TimeoutException {
-      return exchanger.exchange(buffer, EXCHANGE_TIMEOUT_SECONDS, TimeUnit.SECONDS);
-    }
-
-    public void shutdownNow() {
-      if (service != null) {
-        log.info("--- shutting down buffers");
-        service.shutdownNow();
-      }
-    }
-
-    /**
-     * Start processing and block until complete or Exception is thrown.
-     * @param writer writer that exchanges and processes buffers received from a producer.
-     * @throws IOException on errors
-     */
-    public void run(Callable<Boolean> writer) throws IOException {
-      service = ExecutorUtil.newMDCAwareFixedThreadPool(2, new SolrNamedThreadFactory("ExportBuffers"));
-      try {
-        CompletableFuture.runAsync(filler, service);
-        writer.call();
-
-        // alternatively we could run the writer in a separate thread too:
-//        CompletableFuture<Void> allDone = CompletableFuture.allOf(
-//            CompletableFuture.runAsync(filler, service),
-//            CompletableFuture.runAsync(() -> {
-//              try {
-//                writer.call();
-//              } catch (Exception e) {
-//                log.error("writer", e);
-//                shutdownNow();
-//              }
-//            }, service)
-//        );
-//        allDone.join();
-        log.info("-- finished.");
-      } catch (Exception e) {
-        log.error("Exception running filler / writer", e);
-         //
-      } finally {
-        log.info("--- all done, shutting down buffers");
-        service.shutdownNow();
-      }
-    }
-  }
-
-  protected void writeDocs(SolrQueryRequest req, IteratorWriter.ItemWriter writer, Sort sort) throws IOException {
+  protected void writeDocs(SolrQueryRequest req, OutputStream os, IteratorWriter.ItemWriter writer, Sort sort) throws IOException {
     List<LeafReaderContext> leaves = req.getSearcher().getTopReaderContext().leaves();
-    SortDoc sortDoc = getSortDoc(req.getSearcher(), sort.getSort());
     final int queueSize = Math.min(DOCUMENT_BATCH_SIZE, totalHits);
 
-    ExportBuffers buffers = new ExportBuffers(this, leaves, req.getSearcher(), sort, queueSize, totalHits);
+    ExportBuffers buffers = new ExportBuffers(this, leaves, req.getSearcher(), os, writer, sort, queueSize, totalHits);
 
     if (streamExpression != null) {
-      streamContext.put(EXPORT_WRITER_KEY, this);
-      streamContext.put(EXPORT_BUFFERS_KEY, buffers);
-      streamContext.put(LEAF_READERS_KEY, leaves);
+      streamContext.put(ExportBuffers.EXPORT_BUFFERS_KEY, buffers);
       final TupleStream tupleStream = createTupleStream();
       tupleStream.open();
       buffers.run(() -> {
@@ -651,23 +333,29 @@ public class ExportWriter implements SolrCore.RawWriter, Closeable {
           if (t.EOF) {
             break;
           }
-          writer.add((MapWriter) ew -> t.writeMap(ew));
+          // use decorated writer to monitor the number of output writes
+          // and flush the output quickly in case of very few (reduced) output items
+          buffers.getWriter().add((MapWriter) ew -> t.writeMap(ew));
         }
         return true;
       });
       tupleStream.close();
     } else {
       buffers.run(() -> {
-        Buffer buffer = buffers.getOutputBuffer();
+        // get the initial buffer
+        ExportBuffers.Buffer buffer = buffers.getOutputBuffer();
         log.info("--- writer init exchanging from " + buffer);
         buffer = buffers.exchange(buffer);
         log.info("--- writer init got " + buffer);
-        while (buffer.outDocsIndex != Buffer.NO_MORE_DOCS) {
+        while (buffer.outDocsIndex != ExportBuffers.Buffer.NO_MORE_DOCS) {
           if (Thread.currentThread().isInterrupted()) {
             log.info("--- writer interrupted");
             break;
           }
           for (int i = buffer.outDocsIndex; i >= 0; --i) {
+            // we're using the raw writer here because there's no potential
+            // reduction in the number of output items, unlike when using
+            // streaming expressions
             buffer.writeItem(i, writer);
           }
           log.info("--- writer exchanging from " + buffer);
@@ -679,13 +367,13 @@ public class ExportWriter implements SolrCore.RawWriter, Closeable {
     }
   }
 
-  private void fillOutDocs(List<LeafReaderContext> leaves, SortDoc sortDoc,
-                          SortQueue sortQueue, SortDoc[] outDocs, Buffer buffer) throws IOException {
+  void fillOutDocs(List<LeafReaderContext> leaves, SortDoc sortDoc,
+                          SortQueue sortQueue, SortDoc[] outDocs, ExportBuffers.Buffer buffer) throws IOException {
     identifyLowestSortingUnexportedDocs(leaves, sortDoc, sortQueue);
     transferBatchToBufferForOutput(sortQueue, outDocs, leaves, buffer);
   }
 
-  private void materializeDocs(List<LeafReaderContext> leaves, SortDoc[] outDocs, Buffer buffer) throws IOException {
+  private void materializeDocs(List<LeafReaderContext> leaves, SortDoc[] outDocs, ExportBuffers.Buffer buffer) throws IOException {
     log.info("--- materialize docs in " + buffer);
     if (buffer.outDocsIndex < 0) {
       return;
@@ -785,7 +473,7 @@ public class ExportWriter implements SolrCore.RawWriter, Closeable {
     return writers;
   }
 
-  private SortDoc getSortDoc(SolrIndexSearcher searcher, SortField[] sortFields) throws IOException {
+  SortDoc getSortDoc(SolrIndexSearcher searcher, SortField[] sortFields) throws IOException {
     SortValue[] sortValues = new SortValue[sortFields.length];
     IndexSchema schema = searcher.getSchema();
     for (int i = 0; i < sortFields.length; ++i) {
diff --git a/solr/core/src/java/org/apache/solr/handler/export/ExportWriterStream.java b/solr/core/src/java/org/apache/solr/handler/export/ExportWriterStream.java
new file mode 100644
index 0000000..0a783a3
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/handler/export/ExportWriterStream.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.handler.export;
+
+import java.io.IOException;
+import java.lang.invoke.MethodHandles;
+import java.util.List;
+
+import org.apache.solr.client.solrj.io.Tuple;
+import org.apache.solr.client.solrj.io.comp.ComparatorOrder;
+import org.apache.solr.client.solrj.io.comp.FieldComparator;
+import org.apache.solr.client.solrj.io.comp.MultipleFieldComparator;
+import org.apache.solr.client.solrj.io.comp.StreamComparator;
+import org.apache.solr.client.solrj.io.stream.StreamContext;
+import org.apache.solr.client.solrj.io.stream.TupleStream;
+import org.apache.solr.client.solrj.io.stream.expr.Explanation;
+import org.apache.solr.client.solrj.io.stream.expr.Expressible;
+import org.apache.solr.client.solrj.io.stream.expr.StreamExplanation;
+import org.apache.solr.client.solrj.io.stream.expr.StreamExpression;
+import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionParameter;
+import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Stream implementation that helps supporting 'expr' streaming in export writer.
+ * <p>Note: this class is made public only to allow access from {@link org.apache.solr.handler.ExportHandler},
+ * it should be treated as an internal detail of implementation.</p>
+ * @lucene.experimental
+ */
+public class ExportWriterStream extends TupleStream implements Expressible {
+  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+  StreamContext context;
+  StreamComparator streamComparator;
+  int pos = -1;
+  ExportBuffers exportBuffers;
+  ExportBuffers.Buffer buffer;
+
+  public ExportWriterStream(StreamExpression expression, StreamFactory factory) throws IOException {
+    streamComparator = parseComp(factory.getDefaultSort());
+  }
+
+  /**
+   * NOTE: this context must contain an instance of {@link ExportBuffers} under the
+   * {@link ExportBuffers#EXPORT_BUFFERS_KEY} key.
+   */
+  @Override
+  public void setStreamContext(StreamContext context) {
+    this.context = context;
+  }
+
+  @Override
+  public List<TupleStream> children() {
+    return null;
+  }
+
+  private StreamComparator parseComp(String sort) throws IOException {
+
+    String[] sorts = sort.split(",");
+    StreamComparator[] comps = new StreamComparator[sorts.length];
+    for (int i = 0; i < sorts.length; i++) {
+      String s = sorts[i];
+
+      String[] spec = s.trim().split("\\s+"); //This should take into account spaces in the sort spec.
+
+      if (spec.length != 2) {
+        throw new IOException("Invalid sort spec:" + s);
+      }
+
+      String fieldName = spec[0].trim();
+      String order = spec[1].trim();
+
+      comps[i] = new FieldComparator(fieldName, order.equalsIgnoreCase("asc") ? ComparatorOrder.ASCENDING : ComparatorOrder.DESCENDING);
+    }
+
+    if (comps.length > 1) {
+      return new MultipleFieldComparator(comps);
+    } else {
+      return comps[0];
+    }
+  }
+
+  @Override
+  public void open() throws IOException {
+    exportBuffers = (ExportBuffers) context.get(ExportBuffers.EXPORT_BUFFERS_KEY);
+    buffer = exportBuffers.getOutputBuffer();
+  }
+
+  @Override
+  public void close() throws IOException {
+    exportBuffers = null;
+  }
+
+  @Override
+  public Tuple read() throws IOException {
+    if (pos < 0) {
+      try {
+        buffer.outDocsIndex = ExportBuffers.Buffer.EMPTY;
+        log.info("--- ews exchange empty buffer " + buffer);
+        buffer = exportBuffers.exchanger.exchange(buffer);
+        log.info("--- ews got new output buffer " + buffer);
+      } catch (InterruptedException e) {
+        log.info("--- ews interrupt");
+        Thread.currentThread().interrupt();
+        throw new IOException("interrupted");
+      }
+      if (buffer.outDocsIndex == ExportBuffers.Buffer.NO_MORE_DOCS) {
+        log.info("--- ews EOF");
+        return Tuple.EOF();
+      } else {
+        pos = buffer.outDocsIndex;
+        log.info("--- ews new pos=" + pos);
+      }
+    }
+    if (pos < 0) {
+      log.info("--- ews EOF?");
+      return Tuple.EOF();
+    }
+    Tuple tuple = new Tuple(buffer.outDocs[pos]);
+    pos--;
+    return tuple;
+  }
+
+  @Override
+  public StreamComparator getStreamSort() {
+    return streamComparator;
+  }
+
+  @Override
+  public StreamExpressionParameter toExpression(StreamFactory factory) throws IOException {
+    StreamExpression expression = new StreamExpression(factory.getFunctionName(this.getClass()));
+    return expression;
+  }
+
+  @Override
+  public Explanation toExplanation(StreamFactory factory) throws IOException {
+    return new StreamExplanation(getStreamNodeId().toString())
+        .withFunctionName("input")
+        .withImplementingClass(this.getClass().getName())
+        .withExpressionType(Explanation.ExpressionType.STREAM_SOURCE)
+        .withExpression("--non-expressible--");
+  }
+}