You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ab...@apache.org on 2021/03/04 17:44:20 UTC

[lucene-solr] 01/01: SOLR-15210: Initial patch, no tests yet.

This is an automated email from the ASF dual-hosted git repository.

ab pushed a commit to branch jira/solr-15210
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git

commit 5e4242ff69081971839721d46d29655bd8f4e445
Author: Andrzej Bialecki <ab...@apache.org>
AuthorDate: Thu Mar 4 18:43:43 2021 +0100

    SOLR-15210: Initial patch, no tests yet.
---
 .../apache/solr/handler/export/ExportBuffers.java  |   2 +-
 .../apache/solr/handler/export/ExportWriter.java   | 289 +++++++++++++++++----
 .../solr/handler/export/ExportWriterStream.java    | 137 +++++-----
 .../java/org/apache/solr/search/CaffeineCache.java |  10 +-
 .../client/solrj/io/stream/ParallelStream.java     |  10 +-
 .../solr/client/solrj/io/stream/SolrStream.java    |  10 +-
 6 files changed, 334 insertions(+), 124 deletions(-)

diff --git a/solr/core/src/java/org/apache/solr/handler/export/ExportBuffers.java b/solr/core/src/java/org/apache/solr/handler/export/ExportBuffers.java
index 9e5478f..59c575c 100644
--- a/solr/core/src/java/org/apache/solr/handler/export/ExportBuffers.java
+++ b/solr/core/src/java/org/apache/solr/handler/export/ExportBuffers.java
@@ -100,7 +100,7 @@ class ExportBuffers {
         long lastOutputCounter = 0;
         for (int count = 0; count < totalHits; ) {
           // log.debug("--- filler fillOutDocs in {}", fillBuffer);
-          exportWriter.fillOutDocs(mergeIterator, buffer);
+          exportWriter.fillNextBuffer(mergeIterator, buffer);
           count += (buffer.outDocsIndex + 1);
           // log.debug("--- filler count={}, exchange buffer from {}", count, buffer);
           try {
diff --git a/solr/core/src/java/org/apache/solr/handler/export/ExportWriter.java b/solr/core/src/java/org/apache/solr/handler/export/ExportWriter.java
index f2d0b9b..df03b40 100644
--- a/solr/core/src/java/org/apache/solr/handler/export/ExportWriter.java
+++ b/solr/core/src/java/org/apache/solr/handler/export/ExportWriter.java
@@ -24,19 +24,21 @@ import java.io.OutputStreamWriter;
 import java.io.PrintWriter;
 import java.lang.invoke.MethodHandles;
 import java.nio.charset.StandardCharsets;
-import java.util.List;
-import java.util.TreeSet;
+import java.util.*;
 
+import org.apache.lucene.index.IndexReader;
 import org.apache.lucene.index.LeafReader;
 import org.apache.lucene.index.LeafReaderContext;
 import org.apache.lucene.index.SortedDocValues;
 import org.apache.lucene.search.DocIdSetIterator;
 import org.apache.lucene.search.Sort;
 import org.apache.lucene.search.SortField;
+import org.apache.lucene.util.BitSet;
 import org.apache.lucene.util.BitSetIterator;
 import org.apache.lucene.util.FixedBitSet;
 import org.apache.solr.client.solrj.impl.BinaryResponseParser;
 import org.apache.solr.client.solrj.io.Tuple;
+import org.apache.solr.client.solrj.io.stream.ParallelStream;
 import org.apache.solr.client.solrj.io.stream.StreamContext;
 import org.apache.solr.client.solrj.io.stream.TupleStream;
 import org.apache.solr.client.solrj.io.stream.expr.StreamExpression;
@@ -52,6 +54,7 @@ import org.apache.solr.common.params.CommonParams;
 import org.apache.solr.common.params.SolrParams;
 import org.apache.solr.common.params.StreamParams;
 import org.apache.solr.common.util.JavaBinCodec;
+import org.apache.solr.common.util.StrUtils;
 import org.apache.solr.core.SolrCore;
 import org.apache.solr.metrics.SolrMetricsContext;
 import org.apache.solr.request.SolrQueryRequest;
@@ -71,9 +74,7 @@ import org.apache.solr.schema.LongValueFieldType;
 import org.apache.solr.schema.SchemaField;
 import org.apache.solr.schema.SortableTextField;
 import org.apache.solr.schema.StrField;
-import org.apache.solr.search.SolrIndexSearcher;
-import org.apache.solr.search.SortSpec;
-import org.apache.solr.search.SyntaxError;
+import org.apache.solr.search.*;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -97,6 +98,8 @@ public class ExportWriter implements SolrCore.RawWriter, Closeable {
   public static final String BATCH_SIZE_PARAM = "batchSize";
   public static final String QUEUE_SIZE_PARAM = "queueSize";
 
+  public static final String SOLR_CACHE_KEY = "exportCache";
+
   public static final int DEFAULT_BATCH_SIZE = 30000;
   public static final int DEFAULT_QUEUE_SIZE = 150000;
 
@@ -117,13 +120,24 @@ public class ExportWriter implements SolrCore.RawWriter, Closeable {
   int totalHits = 0;
   FixedBitSet[] sets = null;
   PushWriter writer;
-  private String wt;
+  final private String wt;
+  final int numWorkers;
+  final int workerId;
+  final String fieldList;
+  final List<String> partitionKeys;
+  final String partitionCacheKey;
+
+  // per-segment caches for already populated partitioning filters when parallel() is in use
+  final SolrCache<IndexReader.CacheKey, SolrCache<String, FixedBitSet>> partitionCaches;
+
+  // per-segment partitioning filters that are incomplete (still being updated from the current request)
+  final Map<IndexReader.CacheKey, FixedBitSet> tempPartitionCaches;
 
 
 
   public ExportWriter(SolrQueryRequest req, SolrQueryResponse res, String wt,
                       StreamContext initialStreamContext, SolrMetricsContext solrMetricsContext,
-                      String metricsPath) {
+                      String metricsPath) throws Exception {
     this.req = req;
     this.res = res;
     this.wt = wt;
@@ -131,14 +145,35 @@ public class ExportWriter implements SolrCore.RawWriter, Closeable {
     this.solrMetricsContext = solrMetricsContext;
     this.metricsPath = metricsPath;
     this.priorityQueueSize = req.getParams().getInt(QUEUE_SIZE_PARAM, DEFAULT_QUEUE_SIZE);
+    this.numWorkers = req.getParams().getInt(ParallelStream.NUM_WORKERS_PARAM, 1);
+    this.workerId = req.getParams().getInt(ParallelStream.WORKER_ID_PARAM, 0);
+    boolean useHashQuery = req.getParams().getBool(ParallelStream.USE_HASH_QUERY_PARAM, false);
+    if (numWorkers > 1 && !useHashQuery) {
+      String keysList = req.getParams().get(ParallelStream.PARTITION_KEYS_PARAM);
+      if (keysList == null || keysList.trim().equals("none")) {
+        throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "when numWorkers > 1 partitionKeys MUST be specified!");
+      }
+      partitionKeys = StrUtils.splitSmart(keysList, ',', true);
+      // we have to use ALL parameters as a cache key to account for different queries
+      partitionCacheKey = req.getParamString();
+      tempPartitionCaches = new HashMap<>();
+    } else {
+      partitionKeys = null;
+      partitionCacheKey = null;
+      tempPartitionCaches = null;
+    }
+    this.fieldList = req.getParams().get(CommonParams.FL);
     this.batchSize = DEFAULT_BATCH_SIZE;
+    this.partitionCaches = req.getSearcher().getCache(SOLR_CACHE_KEY);
   }
 
   @Override
   public String getContentType() {
     if ("javabin".equals(wt)) {
       return BinaryResponseParser.BINARY_CONTENT_TYPE;
-    } else return "json";
+    } else {
+      return "json";
+    }
   }
 
   @Override
@@ -237,15 +272,14 @@ public class ExportWriter implements SolrCore.RawWriter, Closeable {
       }
     }
     SolrParams params = req.getParams();
-    String fl = params.get("fl");
 
     String[] fields = null;
 
-    if (fl == null) {
+    if (fieldList == null) {
       writeException((new IOException(new SyntaxError("export field list (fl) must be specified."))), writer, true);
       return;
     } else {
-      fields = fl.split(",");
+      fields = fieldList.split(",");
 
       for (int i = 0; i < fields.length; i++) {
 
@@ -265,6 +299,8 @@ public class ExportWriter implements SolrCore.RawWriter, Closeable {
       return;
     }
 
+    outputDoc = new DoubleArrayMapWriter(fieldWriters.length);
+
     String expr = params.get(StreamParams.EXPR);
     if (expr != null) {
       StreamFactory streamFactory = initialStreamContext.getStreamFactory();
@@ -321,34 +357,12 @@ public class ExportWriter implements SolrCore.RawWriter, Closeable {
     return tupleStream;
   }
 
-  private void transferBatchToBufferForOutput(MergeIterator mergeIterator,
-                                              ExportBuffers.Buffer destination) throws IOException {
-    try {
-      int outDocsIndex = -1;
-      for (int i = 0; i < batchSize; i++) {
-        SortDoc sortDoc = mergeIterator.next();
-        if (sortDoc != null) {
-          destination.outDocs[++outDocsIndex].setValues(sortDoc);
-        } else {
-          break;
-        }
-      }
-      destination.outDocsIndex = outDocsIndex;
-    } catch (Throwable t) {
-      log.error("transfer", t);
-      if (t instanceof InterruptedException) {
-        Thread.currentThread().interrupt();
-      }
-      throw t;
-    } finally {
-
-    }
-  }
-
   protected void writeDocs(SolrQueryRequest req, OutputStream os, IteratorWriter.ItemWriter writer, Sort sort) throws IOException {
     List<LeafReaderContext> leaves = req.getSearcher().getTopReaderContext().leaves();
     final int queueSize = Math.min(batchSize, totalHits);
-
+    if (tempPartitionCaches != null && partitionCaches != null) {
+      initTempPartitionCaches(leaves);
+    }
 
     ExportBuffers buffers = new ExportBuffers(this,
                                               leaves,
@@ -417,7 +431,10 @@ public class ExportWriter implements SolrCore.RawWriter, Closeable {
               // reduction in the number of output items, unlike when using
               // streaming expressions
               final SortDoc currentDoc = buffer.outDocs[i];
-              writer.add((MapWriter) ew -> writeDoc(currentDoc, leaves, ew, fieldWriters));
+              MapWriter outputDoc = fillOutputDoc(currentDoc, leaves, fieldWriters);
+              if (outputDoc != null) {
+                writer.add(outputDoc);
+              }
             }
           } finally {
           }
@@ -437,24 +454,172 @@ public class ExportWriter implements SolrCore.RawWriter, Closeable {
         return true;
       });
     }
+    transferTempPartitionCaches();
+  }
+
+  /**
+   * This method transfers the newly built per-segment partitioning bitsets to the global cache,
+   * keyed by the current query.
+   */
+  private void transferTempPartitionCaches() {
+    if (tempPartitionCaches == null || partitionCaches == null) {
+      return;
+    }
+    tempPartitionCaches.forEach((cacheKey, partitionSet) -> {
+      SolrCache<String, FixedBitSet> perSegmentCache = partitionCaches.computeIfAbsent(cacheKey, k -> {
+        CaffeineCache<String, FixedBitSet> cache = new CaffeineCache<>();
+        cache.init(
+                Map.of(
+                        // 100 unique queries should be enough for anyone ;)
+                        SolrCache.SIZE_PARAM, "100",
+                        // evict entries after 600 sec
+                        SolrCache.MAX_IDLE_TIME_PARAM, "600"),
+                null, null);
+        return cache;
+      });
+      // use our unique query+numWorkers+worker key
+      perSegmentCache.put(partitionCacheKey, partitionSet);
+    });
+  }
+
+  // this inits only those sets that are not already present in the global cache
+  private void initTempPartitionCaches(List<LeafReaderContext> leaves) {
+    tempPartitionCaches.clear();
+    for (LeafReaderContext leaf : leaves) {
+      IndexReader.CacheHelper cacheHelper = leaf.reader().getReaderCacheHelper();
+      if (cacheHelper == null) {
+        continue;
+      }
+      IndexReader.CacheKey cacheKey = cacheHelper.getKey();
+      // check if bitset was computed earlier and can be skipped
+      SolrCache<String, FixedBitSet> perSegmentCache = partitionCaches.get(cacheKey);
+      if (perSegmentCache != null && perSegmentCache.get(partitionCacheKey) != null) {
+        // already computed earlier
+        continue;
+      }
+      tempPartitionCaches.put(cacheKey, new FixedBitSet(leaf.reader().maxDoc()));
+    }
+  }
+
+  void fillNextBuffer(MergeIterator mergeIterator,
+                      ExportBuffers.Buffer buffer) throws IOException {
+    try {
+      int outDocsIndex = -1;
+      for (int i = 0; i < batchSize; i++) {
+        SortDoc sortDoc = mergeIterator.next();
+        if (sortDoc != null) {
+          buffer.outDocs[++outDocsIndex].setValues(sortDoc);
+        } else {
+          break;
+        }
+      }
+      buffer.outDocsIndex = outDocsIndex;
+    } catch (Throwable t) {
+      log.error("transfer", t);
+      if (t instanceof InterruptedException) {
+        Thread.currentThread().interrupt();
+      }
+      throw t;
+    } finally {
+
+    }
   }
 
-  void fillOutDocs(MergeIterator mergeIterator,
-                   ExportBuffers.Buffer buffer) throws IOException {
-    transferBatchToBufferForOutput(mergeIterator, buffer);
+  private static final class DoubleArrayMapWriter implements MapWriter, EntryWriter {
+    final CharSequence[] keys;
+    final Object[] values;
+    int pos;
+
+    DoubleArrayMapWriter(int size) {
+      keys = new CharSequence[size];
+      values = new Object[size];
+      pos = 0;
+    }
+
+    @Override
+    public EntryWriter put(CharSequence k, Object v) throws IOException {
+      keys[pos] = k;
+      values[pos] = v;
+      pos++;
+      return this;
+    }
+
+    public void clear() {
+      for (int i = 0; i < pos; i++) {
+        keys[i] = null;
+        values[i] = null;
+      }
+      pos = 0;
+    }
+
+    @Override
+    public void writeMap(EntryWriter ew) throws IOException {
+      for (int i = 0; i < pos; i++) {
+        ew.put(keys[i], values[i]);
+      }
+    }
+
+    public Object get(CharSequence key) {
+      for (int i = 0; i < pos; i++) {
+        if (keys[i].equals(key)) {
+          return values[i];
+        }
+      }
+      return null;
+    }
   }
 
-  void writeDoc(SortDoc sortDoc,
-                List<LeafReaderContext> leaves,
-                EntryWriter ew, FieldWriter[] writers) throws IOException {
+  // we materialize this document so that we can potentially do hash partitioning
+  private DoubleArrayMapWriter outputDoc;
+
+  // WARNING: single-thread only! shared var tempDoc
+  MapWriter fillOutputDoc(SortDoc sortDoc,
+                          List<LeafReaderContext> leaves,
+                          FieldWriter[] writers) throws IOException {
     int ord = sortDoc.ord;
     LeafReaderContext context = leaves.get(ord);
+    // reuse
+    outputDoc.clear();
     int fieldIndex = 0;
     for (FieldWriter fieldWriter : writers) {
-      if (fieldWriter.write(sortDoc, context, ew, fieldIndex)) {
+      if (fieldWriter.write(sortDoc, context, outputDoc, fieldIndex)) {
         ++fieldIndex;
       }
     }
+    if (partitionKeys != null) {
+      return outputDoc;
+    } else {
+      // if we use partitioning then filter out unwanted docs
+      return partitionFilter(sortDoc, context, outputDoc);
+    }
+  }
+
+  MapWriter partitionFilter(SortDoc sortDoc, LeafReaderContext leaf, DoubleArrayMapWriter doc) {
+    // calculate hash
+    int hash = 0;
+    for (String key : partitionKeys) {
+      Object value = doc.get(key);
+      if (value != null) {
+        hash += value.hashCode();
+      }
+    }
+    if ((hash & 0x7FFFFFFF) % numWorkers == workerId) {
+      // our partition
+      // check if we should mark it in the partitionSet
+      IndexReader.CacheHelper cacheHelper = leaf.reader().getReaderCacheHelper();
+      if (cacheHelper != null) {
+        IndexReader.CacheKey cacheKey = cacheHelper.getKey();
+        FixedBitSet partitionSet = tempPartitionCaches.get(cacheKey);
+        if (partitionSet != null) {
+          // not computed before - mark it
+          partitionSet.set(sortDoc.docId);
+        }
+      }
+      return doc;
+    } else {
+      // XXX update the cache
+      return null;
+    }
   }
 
   public FieldWriter[] getFieldWriters(String[] fields, SolrIndexSearcher searcher) throws IOException {
@@ -702,7 +867,9 @@ public class ExportWriter implements SolrCore.RawWriter, Closeable {
       SegmentIterator[] segmentIterators = new SegmentIterator[leaves.size()];
       for (int i = 0; i < segmentIterators.length; i++) {
         SortQueue sortQueue = new SortQueue(sizes[i], sortDoc.copy());
-        segmentIterators[i] = new SegmentIterator(bits[i], leaves.get(i), sortQueue, sortDoc.copy());
+        // check if we have an existing partition filter and use it if present
+        FixedBitSet myPartitionSet = partitionCacheKey != null ? getMyPartitionSet(leaves.get(i)) : null;
+        segmentIterators[i] = new SegmentIterator(bits[i], myPartitionSet, leaves.get(i), sortQueue, sortDoc.copy());
       }
 
       return new MergeIterator(segmentIterators, sortDoc);
@@ -710,6 +877,24 @@ public class ExportWriter implements SolrCore.RawWriter, Closeable {
     }
   }
 
+  private FixedBitSet getMyPartitionSet(LeafReaderContext leaf) {
+    if (partitionCaches == null) {
+      return null;
+    }
+    IndexReader.CacheHelper cacheHelper = leaf.reader().getReaderCacheHelper();
+    if (cacheHelper == null) {
+      return null;
+    }
+    IndexReader.CacheKey cacheKey = cacheHelper.getKey();
+
+    SolrCache<String, FixedBitSet> perSegmentCaches = partitionCaches.get(cacheKey);
+    if (perSegmentCaches == null) {
+      // no queries yet for this segment
+      return null;
+    }
+    return perSegmentCaches.get(partitionCacheKey);
+  }
+
   private static class SegmentIterator {
 
     private final FixedBitSet bits;
@@ -722,8 +907,20 @@ public class ExportWriter implements SolrCore.RawWriter, Closeable {
     private int index;
 
 
-    public SegmentIterator(FixedBitSet bits, LeafReaderContext context, SortQueue sortQueue, SortDoc sortDoc) throws IOException {
+    /**
+     * Construct per-segment iterator for matching docs.
+     * @param bits matching document id-s in the segment
+     * @param myPartitionSet filter to match only the docs in the current worker's partition, may be
+     *                       null if not partitioning
+     * @param context segment context
+     * @param sortQueue sort queue
+     * @param sortDoc proto sort document
+     */
+    public SegmentIterator(FixedBitSet bits, FixedBitSet myPartitionSet, LeafReaderContext context, SortQueue sortQueue, SortDoc sortDoc) throws IOException {
       this.bits = bits;
+      if (myPartitionSet != null) {
+        this.bits.and(myPartitionSet);
+      }
       this.queue = sortQueue;
       this.sortDoc = sortDoc;
       this.nextDoc = sortDoc.copy();
diff --git a/solr/core/src/java/org/apache/solr/handler/export/ExportWriterStream.java b/solr/core/src/java/org/apache/solr/handler/export/ExportWriterStream.java
index 3d0b3b1..f923a3d 100644
--- a/solr/core/src/java/org/apache/solr/handler/export/ExportWriterStream.java
+++ b/solr/core/src/java/org/apache/solr/handler/export/ExportWriterStream.java
@@ -137,81 +137,86 @@ public class ExportWriterStream extends TupleStream implements Expressible {
   @Override
   public Tuple read() throws IOException {
     Tuple res = null;
-    if (pos < 0) {
-
-      try {
-        buffer.outDocsIndex = ExportBuffers.Buffer.EMPTY;
-        //log.debug("--- ews exchange empty buffer {}", buffer);
-        boolean exchanged = false;
-        while (!exchanged) {
-          try {
-            long startExchangeBuffers = System.nanoTime();
-            exportBuffers.exchangeBuffers();
-            long endExchangeBuffers = System.nanoTime();
-            if(log.isDebugEnabled()) {
-              log.debug("Waited for reader thread:{}",  Long.toString(((endExchangeBuffers - startExchangeBuffers) / 1000000)));
-            }
-            exchanged = true;
-          } catch (TimeoutException e) {
-            log.debug("--- ews timeout loop");
-            if (exportBuffers.isShutDown()) {
-              log.debug("--- ews - the other end is shutdown, returning EOF");
-              res = Tuple.EOF();
-              break;
-            }
-            continue;
-          } catch (InterruptedException e) {
-            log.debug("--- ews interrupted");
-            exportBuffers.error(e);
-            res = Tuple.EXCEPTION(e, true);
-            break;
-          } catch (BrokenBarrierException e) {
-            if (exportBuffers.getError() != null) {
-              res = Tuple.EXCEPTION(exportBuffers.getError(), true);
-            } else {
+    do {
+      if (pos < 0) {
+
+        try {
+          buffer.outDocsIndex = ExportBuffers.Buffer.EMPTY;
+          //log.debug("--- ews exchange empty buffer {}", buffer);
+          boolean exchanged = false;
+          while (!exchanged) {
+            try {
+              long startExchangeBuffers = System.nanoTime();
+              exportBuffers.exchangeBuffers();
+              long endExchangeBuffers = System.nanoTime();
+              if(log.isDebugEnabled()) {
+                log.debug("Waited for reader thread:{}",  Long.toString(((endExchangeBuffers - startExchangeBuffers) / 1000000)));
+              }
+              exchanged = true;
+            } catch (TimeoutException e) {
+              log.debug("--- ews timeout loop");
+              if (exportBuffers.isShutDown()) {
+                log.debug("--- ews - the other end is shutdown, returning EOF");
+                res = Tuple.EOF();
+                break;
+              }
+              continue;
+            } catch (InterruptedException e) {
+              log.debug("--- ews interrupted");
+              exportBuffers.error(e);
               res = Tuple.EXCEPTION(e, true);
+              break;
+            } catch (BrokenBarrierException e) {
+              if (exportBuffers.getError() != null) {
+                res = Tuple.EXCEPTION(exportBuffers.getError(), true);
+              } else {
+                res = Tuple.EXCEPTION(e, true);
+              }
+              break;
+            } finally {
             }
-            break;
-          } finally {
           }
+        } catch (InterruptedException e) {
+          log.debug("--- ews interrupt");
+          exportBuffers.error(e);
+          res = Tuple.EXCEPTION(e, true);
+        } catch (Exception e) {
+          log.debug("--- ews exception", e);
+          exportBuffers.error(e);
+          res = Tuple.EXCEPTION(e, true);
+        }
+        buffer = exportBuffers.getOutputBuffer();
+        if (buffer == null) {
+          res = Tuple.EOF();
+        }
+        if (buffer.outDocsIndex == ExportBuffers.Buffer.NO_MORE_DOCS) {
+          log.debug("--- ews EOF");
+          res = Tuple.EOF();
+        } else {
+          pos = buffer.outDocsIndex;
+          index = -1;  //restart index.
+          log.debug("--- ews new pos={}", pos);
         }
-      } catch (InterruptedException e) {
-        log.debug("--- ews interrupt");
-        exportBuffers.error(e);
-        res = Tuple.EXCEPTION(e, true);
-      } catch (Exception e) {
-        log.debug("--- ews exception", e);
-        exportBuffers.error(e);
-        res = Tuple.EXCEPTION(e, true);
       }
-      buffer = exportBuffers.getOutputBuffer();
-      if (buffer == null) {
+      if (pos < 0) {
+        log.debug("--- ews EOF?");
         res = Tuple.EOF();
       }
-      if (buffer.outDocsIndex == ExportBuffers.Buffer.NO_MORE_DOCS) {
-        log.debug("--- ews EOF");
-        res = Tuple.EOF();
-      } else {
-        pos = buffer.outDocsIndex;
-        index = -1;  //restart index.
-        log.debug("--- ews new pos={}", pos);
+      if (res != null) {
+        // only errors or EOF assigned result so far
+        return res;
       }
-    }
-    if (pos < 0) {
-      log.debug("--- ews EOF?");
-      res = Tuple.EOF();
-    }
-    if (res != null) {
-      // only errors or EOF assigned result so far
 
-      return res;
-    }
-
-    SortDoc sortDoc = buffer.outDocs[++index];
-    tupleEntryWriter.tuple = new Tuple();
-    exportBuffers.exportWriter.writeDoc(sortDoc, exportBuffers.leaves, tupleEntryWriter, exportBuffers.exportWriter.fieldWriters);
-    pos--;
-    return tupleEntryWriter.tuple;
+      SortDoc sortDoc = buffer.outDocs[++index];
+      MapWriter outputDoc = exportBuffers.exportWriter.fillOutputDoc(sortDoc, exportBuffers.leaves, exportBuffers.exportWriter.fieldWriters);
+      pos--;
+      if (outputDoc != null) {
+        tupleEntryWriter.tuple = new Tuple();
+        outputDoc.writeMap(tupleEntryWriter);
+        return tupleEntryWriter.tuple;
+      }
+    } while (res == null);
+    return res;
   }
 
   @Override
diff --git a/solr/core/src/java/org/apache/solr/search/CaffeineCache.java b/solr/core/src/java/org/apache/solr/search/CaffeineCache.java
index 756718c..e9ade62 100644
--- a/solr/core/src/java/org/apache/solr/search/CaffeineCache.java
+++ b/solr/core/src/java/org/apache/solr/search/CaffeineCache.java
@@ -25,11 +25,7 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Optional;
 import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Executor;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.ForkJoinPool;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.*;
 import java.util.concurrent.atomic.LongAdder;
 import java.util.function.Function;
 
@@ -225,6 +221,10 @@ public class CaffeineCache<K, V> extends SolrCacheBase implements SolrCache<K, V
     ramBytes.reset();
   }
 
+  public ConcurrentMap<K, V> asMap() {
+    return cache.asMap();
+  }
+
   @Override
   public int size() {
     return cache.asMap().size();
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ParallelStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ParallelStream.java
index 0d48b0b..b567f86 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ParallelStream.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ParallelStream.java
@@ -45,6 +45,11 @@ import static org.apache.solr.common.params.CommonParams.SORT;
  **/
 public class ParallelStream extends CloudSolrStream implements Expressible {
 
+  public static final String NUM_WORKERS_PARAM = "numWorkers";
+  public static final String WORKER_ID_PARAM = "workerID";
+  public static final String PARTITION_KEYS_PARAM = "partitionKeys";
+  public static final String USE_HASH_QUERY_PARAM = "useHashQuery";
+
   private TupleStream tupleStream;
   private int workers;
   private transient StreamFactory streamFactory;
@@ -252,8 +257,9 @@ public class ParallelStream extends CloudSolrStream implements Expressible {
       for(int w=0; w<workers; w++) {
         ModifiableSolrParams paramsLoc = new ModifiableSolrParams();
         paramsLoc.set(DISTRIB,"false"); // We are the aggregator.
-        paramsLoc.set("numWorkers", workers);
-        paramsLoc.set("workerID", w);
+        paramsLoc.set(NUM_WORKERS_PARAM, workers);
+        paramsLoc.set(WORKER_ID_PARAM, w);
+        paramsLoc.set(USE_HASH_QUERY_PARAM, false);
 
         paramsLoc.set("expr", pushStream.toString());
         paramsLoc.set("qt","/stream");
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/SolrStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/SolrStream.java
index e6210ee..58accf5 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/SolrStream.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/SolrStream.java
@@ -152,15 +152,17 @@ public class SolrStream extends TupleStream {
   private ModifiableSolrParams loadParams(SolrParams paramsIn) throws IOException {
     ModifiableSolrParams solrParams = new ModifiableSolrParams(paramsIn);
     if (params.get("partitionKeys") != null) {
-      if(!params.get("partitionKeys").equals("none") && numWorkers > 1) {
-        String partitionFilter = getPartitionFilter();
-        solrParams.add("fq", partitionFilter);
+      if (!params.get("partitionKeys").equals("none") && numWorkers > 1) {
+        if (params.getBool(ParallelStream.USE_HASH_QUERY_PARAM, false)) {
+          String partitionFilter = getPartitionFilter();
+          solrParams.add("fq", partitionFilter);
+        }
       }
     } else if(numWorkers > 1) {
         throw new IOException("When numWorkers > 1 partitionKeys must be set. Set partitionKeys=none to send the entire stream to each worker.");
     }
 
-    if(checkpoint > 0) {
+    if (checkpoint > 0) {
       solrParams.add("fq", "{!frange cost=100 incl=false l="+checkpoint+"}_version_");
     }