You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by dw...@apache.org on 2021/03/10 09:45:01 UTC
[lucene] 01/03: SOLR-15210: Initial patch, no tests yet.
This is an automated email from the ASF dual-hosted git repository.
dweiss pushed a commit to branch jira/solr-15210
in repository https://gitbox.apache.org/repos/asf/lucene.git
commit 5e4242ff69081971839721d46d29655bd8f4e445
Author: Andrzej Bialecki <ab...@apache.org>
AuthorDate: Thu Mar 4 18:43:43 2021 +0100
SOLR-15210: Initial patch, no tests yet.
---
.../apache/solr/handler/export/ExportBuffers.java | 2 +-
.../apache/solr/handler/export/ExportWriter.java | 289 +++++++++++++++++----
.../solr/handler/export/ExportWriterStream.java | 137 +++++-----
.../java/org/apache/solr/search/CaffeineCache.java | 10 +-
.../client/solrj/io/stream/ParallelStream.java | 10 +-
.../solr/client/solrj/io/stream/SolrStream.java | 10 +-
6 files changed, 334 insertions(+), 124 deletions(-)
diff --git a/solr/core/src/java/org/apache/solr/handler/export/ExportBuffers.java b/solr/core/src/java/org/apache/solr/handler/export/ExportBuffers.java
index 9e5478f..59c575c 100644
--- a/solr/core/src/java/org/apache/solr/handler/export/ExportBuffers.java
+++ b/solr/core/src/java/org/apache/solr/handler/export/ExportBuffers.java
@@ -100,7 +100,7 @@ class ExportBuffers {
long lastOutputCounter = 0;
for (int count = 0; count < totalHits; ) {
// log.debug("--- filler fillOutDocs in {}", fillBuffer);
- exportWriter.fillOutDocs(mergeIterator, buffer);
+ exportWriter.fillNextBuffer(mergeIterator, buffer);
count += (buffer.outDocsIndex + 1);
// log.debug("--- filler count={}, exchange buffer from {}", count, buffer);
try {
diff --git a/solr/core/src/java/org/apache/solr/handler/export/ExportWriter.java b/solr/core/src/java/org/apache/solr/handler/export/ExportWriter.java
index f2d0b9b..df03b40 100644
--- a/solr/core/src/java/org/apache/solr/handler/export/ExportWriter.java
+++ b/solr/core/src/java/org/apache/solr/handler/export/ExportWriter.java
@@ -24,19 +24,21 @@ import java.io.OutputStreamWriter;
import java.io.PrintWriter;
import java.lang.invoke.MethodHandles;
import java.nio.charset.StandardCharsets;
-import java.util.List;
-import java.util.TreeSet;
+import java.util.*;
+import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.LeafReader;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.SortedDocValues;
import org.apache.lucene.search.DocIdSetIterator;
import org.apache.lucene.search.Sort;
import org.apache.lucene.search.SortField;
+import org.apache.lucene.util.BitSet;
import org.apache.lucene.util.BitSetIterator;
import org.apache.lucene.util.FixedBitSet;
import org.apache.solr.client.solrj.impl.BinaryResponseParser;
import org.apache.solr.client.solrj.io.Tuple;
+import org.apache.solr.client.solrj.io.stream.ParallelStream;
import org.apache.solr.client.solrj.io.stream.StreamContext;
import org.apache.solr.client.solrj.io.stream.TupleStream;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpression;
@@ -52,6 +54,7 @@ import org.apache.solr.common.params.CommonParams;
import org.apache.solr.common.params.SolrParams;
import org.apache.solr.common.params.StreamParams;
import org.apache.solr.common.util.JavaBinCodec;
+import org.apache.solr.common.util.StrUtils;
import org.apache.solr.core.SolrCore;
import org.apache.solr.metrics.SolrMetricsContext;
import org.apache.solr.request.SolrQueryRequest;
@@ -71,9 +74,7 @@ import org.apache.solr.schema.LongValueFieldType;
import org.apache.solr.schema.SchemaField;
import org.apache.solr.schema.SortableTextField;
import org.apache.solr.schema.StrField;
-import org.apache.solr.search.SolrIndexSearcher;
-import org.apache.solr.search.SortSpec;
-import org.apache.solr.search.SyntaxError;
+import org.apache.solr.search.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -97,6 +98,8 @@ public class ExportWriter implements SolrCore.RawWriter, Closeable {
public static final String BATCH_SIZE_PARAM = "batchSize";
public static final String QUEUE_SIZE_PARAM = "queueSize";
+ public static final String SOLR_CACHE_KEY = "exportCache";
+
public static final int DEFAULT_BATCH_SIZE = 30000;
public static final int DEFAULT_QUEUE_SIZE = 150000;
@@ -117,13 +120,24 @@ public class ExportWriter implements SolrCore.RawWriter, Closeable {
int totalHits = 0;
FixedBitSet[] sets = null;
PushWriter writer;
- private String wt;
+ final private String wt;
+ final int numWorkers;
+ final int workerId;
+ final String fieldList;
+ final List<String> partitionKeys;
+ final String partitionCacheKey;
+
+ // per-segment caches for already populated partitioning filters when parallel() is in use
+ final SolrCache<IndexReader.CacheKey, SolrCache<String, FixedBitSet>> partitionCaches;
+
+ // per-segment partitioning filters that are incomplete (still being updated from the current request)
+ final Map<IndexReader.CacheKey, FixedBitSet> tempPartitionCaches;
public ExportWriter(SolrQueryRequest req, SolrQueryResponse res, String wt,
StreamContext initialStreamContext, SolrMetricsContext solrMetricsContext,
- String metricsPath) {
+ String metricsPath) throws Exception {
this.req = req;
this.res = res;
this.wt = wt;
@@ -131,14 +145,35 @@ public class ExportWriter implements SolrCore.RawWriter, Closeable {
this.solrMetricsContext = solrMetricsContext;
this.metricsPath = metricsPath;
this.priorityQueueSize = req.getParams().getInt(QUEUE_SIZE_PARAM, DEFAULT_QUEUE_SIZE);
+ this.numWorkers = req.getParams().getInt(ParallelStream.NUM_WORKERS_PARAM, 1);
+ this.workerId = req.getParams().getInt(ParallelStream.WORKER_ID_PARAM, 0);
+ boolean useHashQuery = req.getParams().getBool(ParallelStream.USE_HASH_QUERY_PARAM, false);
+ if (numWorkers > 1 && !useHashQuery) {
+ String keysList = req.getParams().get(ParallelStream.PARTITION_KEYS_PARAM);
+ if (keysList == null || keysList.trim().equals("none")) {
+ throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "when numWorkers > 1 partitionKeys MUST be specified!");
+ }
+ partitionKeys = StrUtils.splitSmart(keysList, ',', true);
+ // we have to use ALL parameters as a cache key to account for different queries
+ partitionCacheKey = req.getParamString();
+ tempPartitionCaches = new HashMap<>();
+ } else {
+ partitionKeys = null;
+ partitionCacheKey = null;
+ tempPartitionCaches = null;
+ }
+ this.fieldList = req.getParams().get(CommonParams.FL);
this.batchSize = DEFAULT_BATCH_SIZE;
+ this.partitionCaches = req.getSearcher().getCache(SOLR_CACHE_KEY);
}
@Override
public String getContentType() {
if ("javabin".equals(wt)) {
return BinaryResponseParser.BINARY_CONTENT_TYPE;
- } else return "json";
+ } else {
+ return "json";
+ }
}
@Override
@@ -237,15 +272,14 @@ public class ExportWriter implements SolrCore.RawWriter, Closeable {
}
}
SolrParams params = req.getParams();
- String fl = params.get("fl");
String[] fields = null;
- if (fl == null) {
+ if (fieldList == null) {
writeException((new IOException(new SyntaxError("export field list (fl) must be specified."))), writer, true);
return;
} else {
- fields = fl.split(",");
+ fields = fieldList.split(",");
for (int i = 0; i < fields.length; i++) {
@@ -265,6 +299,8 @@ public class ExportWriter implements SolrCore.RawWriter, Closeable {
return;
}
+ outputDoc = new DoubleArrayMapWriter(fieldWriters.length);
+
String expr = params.get(StreamParams.EXPR);
if (expr != null) {
StreamFactory streamFactory = initialStreamContext.getStreamFactory();
@@ -321,34 +357,12 @@ public class ExportWriter implements SolrCore.RawWriter, Closeable {
return tupleStream;
}
- private void transferBatchToBufferForOutput(MergeIterator mergeIterator,
- ExportBuffers.Buffer destination) throws IOException {
- try {
- int outDocsIndex = -1;
- for (int i = 0; i < batchSize; i++) {
- SortDoc sortDoc = mergeIterator.next();
- if (sortDoc != null) {
- destination.outDocs[++outDocsIndex].setValues(sortDoc);
- } else {
- break;
- }
- }
- destination.outDocsIndex = outDocsIndex;
- } catch (Throwable t) {
- log.error("transfer", t);
- if (t instanceof InterruptedException) {
- Thread.currentThread().interrupt();
- }
- throw t;
- } finally {
-
- }
- }
-
protected void writeDocs(SolrQueryRequest req, OutputStream os, IteratorWriter.ItemWriter writer, Sort sort) throws IOException {
List<LeafReaderContext> leaves = req.getSearcher().getTopReaderContext().leaves();
final int queueSize = Math.min(batchSize, totalHits);
-
+ if (tempPartitionCaches != null && partitionCaches != null) {
+ initTempPartitionCaches(leaves);
+ }
ExportBuffers buffers = new ExportBuffers(this,
leaves,
@@ -417,7 +431,10 @@ public class ExportWriter implements SolrCore.RawWriter, Closeable {
// reduction in the number of output items, unlike when using
// streaming expressions
final SortDoc currentDoc = buffer.outDocs[i];
- writer.add((MapWriter) ew -> writeDoc(currentDoc, leaves, ew, fieldWriters));
+ MapWriter outputDoc = fillOutputDoc(currentDoc, leaves, fieldWriters);
+ if (outputDoc != null) {
+ writer.add(outputDoc);
+ }
}
} finally {
}
@@ -437,24 +454,172 @@ public class ExportWriter implements SolrCore.RawWriter, Closeable {
return true;
});
}
+ transferTempPartitionCaches();
+ }
+
+ /**
+ * This method transfers the newly built per-segment partitioning bitsets to the global cache,
+ * keyed by the current query.
+ */
+ private void transferTempPartitionCaches() {
+ if (tempPartitionCaches == null || partitionCaches == null) {
+ return;
+ }
+ tempPartitionCaches.forEach((cacheKey, partitionSet) -> {
+ SolrCache<String, FixedBitSet> perSegmentCache = partitionCaches.computeIfAbsent(cacheKey, k -> {
+ CaffeineCache<String, FixedBitSet> cache = new CaffeineCache<>();
+ cache.init(
+ Map.of(
+ // 100 unique queries should be enough for anyone ;)
+ SolrCache.SIZE_PARAM, "100",
+ // evict entries after 600 sec
+ SolrCache.MAX_IDLE_TIME_PARAM, "600"),
+ null, null);
+ return cache;
+ });
+ // use our unique query+numWorkers+worker key
+ perSegmentCache.put(partitionCacheKey, partitionSet);
+ });
+ }
+
+ // this inits only those sets that are not already present in the global cache
+ private void initTempPartitionCaches(List<LeafReaderContext> leaves) {
+ tempPartitionCaches.clear();
+ for (LeafReaderContext leaf : leaves) {
+ IndexReader.CacheHelper cacheHelper = leaf.reader().getReaderCacheHelper();
+ if (cacheHelper == null) {
+ continue;
+ }
+ IndexReader.CacheKey cacheKey = cacheHelper.getKey();
+ // check if bitset was computed earlier and can be skipped
+ SolrCache<String, FixedBitSet> perSegmentCache = partitionCaches.get(cacheKey);
+ if (perSegmentCache != null && perSegmentCache.get(partitionCacheKey) != null) {
+ // already computed earlier
+ continue;
+ }
+ tempPartitionCaches.put(cacheKey, new FixedBitSet(leaf.reader().maxDoc()));
+ }
+ }
+
+ void fillNextBuffer(MergeIterator mergeIterator,
+ ExportBuffers.Buffer buffer) throws IOException {
+ try {
+ int outDocsIndex = -1;
+ for (int i = 0; i < batchSize; i++) {
+ SortDoc sortDoc = mergeIterator.next();
+ if (sortDoc != null) {
+ buffer.outDocs[++outDocsIndex].setValues(sortDoc);
+ } else {
+ break;
+ }
+ }
+ buffer.outDocsIndex = outDocsIndex;
+ } catch (Throwable t) {
+ log.error("transfer", t);
+ if (t instanceof InterruptedException) {
+ Thread.currentThread().interrupt();
+ }
+ throw t;
+ } finally {
+
+ }
}
- void fillOutDocs(MergeIterator mergeIterator,
- ExportBuffers.Buffer buffer) throws IOException {
- transferBatchToBufferForOutput(mergeIterator, buffer);
+ private static final class DoubleArrayMapWriter implements MapWriter, EntryWriter {
+ final CharSequence[] keys;
+ final Object[] values;
+ int pos;
+
+ DoubleArrayMapWriter(int size) {
+ keys = new CharSequence[size];
+ values = new Object[size];
+ pos = 0;
+ }
+
+ @Override
+ public EntryWriter put(CharSequence k, Object v) throws IOException {
+ keys[pos] = k;
+ values[pos] = v;
+ pos++;
+ return this;
+ }
+
+ public void clear() {
+ for (int i = 0; i < pos; i++) {
+ keys[i] = null;
+ values[i] = null;
+ }
+ pos = 0;
+ }
+
+ @Override
+ public void writeMap(EntryWriter ew) throws IOException {
+ for (int i = 0; i < pos; i++) {
+ ew.put(keys[i], values[i]);
+ }
+ }
+
+ public Object get(CharSequence key) {
+ for (int i = 0; i < pos; i++) {
+ if (keys[i].equals(key)) {
+ return values[i];
+ }
+ }
+ return null;
+ }
}
- void writeDoc(SortDoc sortDoc,
- List<LeafReaderContext> leaves,
- EntryWriter ew, FieldWriter[] writers) throws IOException {
+ // we materialize this document so that we can potentially do hash partitioning
+ private DoubleArrayMapWriter outputDoc;
+
+ // WARNING: single-thread only! shared var tempDoc
+ MapWriter fillOutputDoc(SortDoc sortDoc,
+ List<LeafReaderContext> leaves,
+ FieldWriter[] writers) throws IOException {
int ord = sortDoc.ord;
LeafReaderContext context = leaves.get(ord);
+ // reuse
+ outputDoc.clear();
int fieldIndex = 0;
for (FieldWriter fieldWriter : writers) {
- if (fieldWriter.write(sortDoc, context, ew, fieldIndex)) {
+ if (fieldWriter.write(sortDoc, context, outputDoc, fieldIndex)) {
++fieldIndex;
}
}
+ if (partitionKeys != null) {
+ return outputDoc;
+ } else {
+ // if we use partitioning then filter out unwanted docs
+ return partitionFilter(sortDoc, context, outputDoc);
+ }
+ }
+
+ MapWriter partitionFilter(SortDoc sortDoc, LeafReaderContext leaf, DoubleArrayMapWriter doc) {
+ // calculate hash
+ int hash = 0;
+ for (String key : partitionKeys) {
+ Object value = doc.get(key);
+ if (value != null) {
+ hash += value.hashCode();
+ }
+ }
+ if ((hash & 0x7FFFFFFF) % numWorkers == workerId) {
+ // our partition
+ // check if we should mark it in the partitionSet
+ IndexReader.CacheHelper cacheHelper = leaf.reader().getReaderCacheHelper();
+ if (cacheHelper != null) {
+ IndexReader.CacheKey cacheKey = cacheHelper.getKey();
+ FixedBitSet partitionSet = tempPartitionCaches.get(cacheKey);
+ if (partitionSet != null) {
+ // not computed before - mark it
+ partitionSet.set(sortDoc.docId);
+ }
+ }
+ return doc;
+ } else {
+ // XXX update the cache
+ return null;
+ }
}
public FieldWriter[] getFieldWriters(String[] fields, SolrIndexSearcher searcher) throws IOException {
@@ -702,7 +867,9 @@ public class ExportWriter implements SolrCore.RawWriter, Closeable {
SegmentIterator[] segmentIterators = new SegmentIterator[leaves.size()];
for (int i = 0; i < segmentIterators.length; i++) {
SortQueue sortQueue = new SortQueue(sizes[i], sortDoc.copy());
- segmentIterators[i] = new SegmentIterator(bits[i], leaves.get(i), sortQueue, sortDoc.copy());
+ // check if we have an existing partition filter and use it if present
+ FixedBitSet myPartitionSet = partitionCacheKey != null ? getMyPartitionSet(leaves.get(i)) : null;
+ segmentIterators[i] = new SegmentIterator(bits[i], myPartitionSet, leaves.get(i), sortQueue, sortDoc.copy());
}
return new MergeIterator(segmentIterators, sortDoc);
@@ -710,6 +877,24 @@ public class ExportWriter implements SolrCore.RawWriter, Closeable {
}
}
+ private FixedBitSet getMyPartitionSet(LeafReaderContext leaf) {
+ if (partitionCaches == null) {
+ return null;
+ }
+ IndexReader.CacheHelper cacheHelper = leaf.reader().getReaderCacheHelper();
+ if (cacheHelper == null) {
+ return null;
+ }
+ IndexReader.CacheKey cacheKey = cacheHelper.getKey();
+
+ SolrCache<String, FixedBitSet> perSegmentCaches = partitionCaches.get(cacheKey);
+ if (perSegmentCaches == null) {
+ // no queries yet for this segment
+ return null;
+ }
+ return perSegmentCaches.get(partitionCacheKey);
+ }
+
private static class SegmentIterator {
private final FixedBitSet bits;
@@ -722,8 +907,20 @@ public class ExportWriter implements SolrCore.RawWriter, Closeable {
private int index;
- public SegmentIterator(FixedBitSet bits, LeafReaderContext context, SortQueue sortQueue, SortDoc sortDoc) throws IOException {
+ /**
+ * Construct per-segment iterator for matching docs.
+ * @param bits matching document id-s in the segment
+ * @param myPartitionSet filter to match only the docs in the current worker's partition, may be
+ * null if not partitioning
+ * @param context segment context
+ * @param sortQueue sort queue
+ * @param sortDoc proto sort document
+ */
+ public SegmentIterator(FixedBitSet bits, FixedBitSet myPartitionSet, LeafReaderContext context, SortQueue sortQueue, SortDoc sortDoc) throws IOException {
this.bits = bits;
+ if (myPartitionSet != null) {
+ this.bits.and(myPartitionSet);
+ }
this.queue = sortQueue;
this.sortDoc = sortDoc;
this.nextDoc = sortDoc.copy();
diff --git a/solr/core/src/java/org/apache/solr/handler/export/ExportWriterStream.java b/solr/core/src/java/org/apache/solr/handler/export/ExportWriterStream.java
index 3d0b3b1..f923a3d 100644
--- a/solr/core/src/java/org/apache/solr/handler/export/ExportWriterStream.java
+++ b/solr/core/src/java/org/apache/solr/handler/export/ExportWriterStream.java
@@ -137,81 +137,86 @@ public class ExportWriterStream extends TupleStream implements Expressible {
@Override
public Tuple read() throws IOException {
Tuple res = null;
- if (pos < 0) {
-
- try {
- buffer.outDocsIndex = ExportBuffers.Buffer.EMPTY;
- //log.debug("--- ews exchange empty buffer {}", buffer);
- boolean exchanged = false;
- while (!exchanged) {
- try {
- long startExchangeBuffers = System.nanoTime();
- exportBuffers.exchangeBuffers();
- long endExchangeBuffers = System.nanoTime();
- if(log.isDebugEnabled()) {
- log.debug("Waited for reader thread:{}", Long.toString(((endExchangeBuffers - startExchangeBuffers) / 1000000)));
- }
- exchanged = true;
- } catch (TimeoutException e) {
- log.debug("--- ews timeout loop");
- if (exportBuffers.isShutDown()) {
- log.debug("--- ews - the other end is shutdown, returning EOF");
- res = Tuple.EOF();
- break;
- }
- continue;
- } catch (InterruptedException e) {
- log.debug("--- ews interrupted");
- exportBuffers.error(e);
- res = Tuple.EXCEPTION(e, true);
- break;
- } catch (BrokenBarrierException e) {
- if (exportBuffers.getError() != null) {
- res = Tuple.EXCEPTION(exportBuffers.getError(), true);
- } else {
+ do {
+ if (pos < 0) {
+
+ try {
+ buffer.outDocsIndex = ExportBuffers.Buffer.EMPTY;
+ //log.debug("--- ews exchange empty buffer {}", buffer);
+ boolean exchanged = false;
+ while (!exchanged) {
+ try {
+ long startExchangeBuffers = System.nanoTime();
+ exportBuffers.exchangeBuffers();
+ long endExchangeBuffers = System.nanoTime();
+ if(log.isDebugEnabled()) {
+ log.debug("Waited for reader thread:{}", Long.toString(((endExchangeBuffers - startExchangeBuffers) / 1000000)));
+ }
+ exchanged = true;
+ } catch (TimeoutException e) {
+ log.debug("--- ews timeout loop");
+ if (exportBuffers.isShutDown()) {
+ log.debug("--- ews - the other end is shutdown, returning EOF");
+ res = Tuple.EOF();
+ break;
+ }
+ continue;
+ } catch (InterruptedException e) {
+ log.debug("--- ews interrupted");
+ exportBuffers.error(e);
res = Tuple.EXCEPTION(e, true);
+ break;
+ } catch (BrokenBarrierException e) {
+ if (exportBuffers.getError() != null) {
+ res = Tuple.EXCEPTION(exportBuffers.getError(), true);
+ } else {
+ res = Tuple.EXCEPTION(e, true);
+ }
+ break;
+ } finally {
}
- break;
- } finally {
}
+ } catch (InterruptedException e) {
+ log.debug("--- ews interrupt");
+ exportBuffers.error(e);
+ res = Tuple.EXCEPTION(e, true);
+ } catch (Exception e) {
+ log.debug("--- ews exception", e);
+ exportBuffers.error(e);
+ res = Tuple.EXCEPTION(e, true);
+ }
+ buffer = exportBuffers.getOutputBuffer();
+ if (buffer == null) {
+ res = Tuple.EOF();
+ }
+ if (buffer.outDocsIndex == ExportBuffers.Buffer.NO_MORE_DOCS) {
+ log.debug("--- ews EOF");
+ res = Tuple.EOF();
+ } else {
+ pos = buffer.outDocsIndex;
+ index = -1; //restart index.
+ log.debug("--- ews new pos={}", pos);
}
- } catch (InterruptedException e) {
- log.debug("--- ews interrupt");
- exportBuffers.error(e);
- res = Tuple.EXCEPTION(e, true);
- } catch (Exception e) {
- log.debug("--- ews exception", e);
- exportBuffers.error(e);
- res = Tuple.EXCEPTION(e, true);
}
- buffer = exportBuffers.getOutputBuffer();
- if (buffer == null) {
+ if (pos < 0) {
+ log.debug("--- ews EOF?");
res = Tuple.EOF();
}
- if (buffer.outDocsIndex == ExportBuffers.Buffer.NO_MORE_DOCS) {
- log.debug("--- ews EOF");
- res = Tuple.EOF();
- } else {
- pos = buffer.outDocsIndex;
- index = -1; //restart index.
- log.debug("--- ews new pos={}", pos);
+ if (res != null) {
+ // only errors or EOF assigned result so far
+ return res;
}
- }
- if (pos < 0) {
- log.debug("--- ews EOF?");
- res = Tuple.EOF();
- }
- if (res != null) {
- // only errors or EOF assigned result so far
- return res;
- }
-
- SortDoc sortDoc = buffer.outDocs[++index];
- tupleEntryWriter.tuple = new Tuple();
- exportBuffers.exportWriter.writeDoc(sortDoc, exportBuffers.leaves, tupleEntryWriter, exportBuffers.exportWriter.fieldWriters);
- pos--;
- return tupleEntryWriter.tuple;
+ SortDoc sortDoc = buffer.outDocs[++index];
+ MapWriter outputDoc = exportBuffers.exportWriter.fillOutputDoc(sortDoc, exportBuffers.leaves, exportBuffers.exportWriter.fieldWriters);
+ pos--;
+ if (outputDoc != null) {
+ tupleEntryWriter.tuple = new Tuple();
+ outputDoc.writeMap(tupleEntryWriter);
+ return tupleEntryWriter.tuple;
+ }
+ } while (res == null);
+ return res;
}
@Override
diff --git a/solr/core/src/java/org/apache/solr/search/CaffeineCache.java b/solr/core/src/java/org/apache/solr/search/CaffeineCache.java
index 756718c..e9ade62 100644
--- a/solr/core/src/java/org/apache/solr/search/CaffeineCache.java
+++ b/solr/core/src/java/org/apache/solr/search/CaffeineCache.java
@@ -25,11 +25,7 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Executor;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.ForkJoinPool;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.*;
import java.util.concurrent.atomic.LongAdder;
import java.util.function.Function;
@@ -225,6 +221,10 @@ public class CaffeineCache<K, V> extends SolrCacheBase implements SolrCache<K, V
ramBytes.reset();
}
+ public ConcurrentMap<K, V> asMap() {
+ return cache.asMap();
+ }
+
@Override
public int size() {
return cache.asMap().size();
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ParallelStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ParallelStream.java
index 0d48b0b..b567f86 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ParallelStream.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ParallelStream.java
@@ -45,6 +45,11 @@ import static org.apache.solr.common.params.CommonParams.SORT;
**/
public class ParallelStream extends CloudSolrStream implements Expressible {
+ public static final String NUM_WORKERS_PARAM = "numWorkers";
+ public static final String WORKER_ID_PARAM = "workerID";
+ public static final String PARTITION_KEYS_PARAM = "partitionKeys";
+ public static final String USE_HASH_QUERY_PARAM = "useHashQuery";
+
private TupleStream tupleStream;
private int workers;
private transient StreamFactory streamFactory;
@@ -252,8 +257,9 @@ public class ParallelStream extends CloudSolrStream implements Expressible {
for(int w=0; w<workers; w++) {
ModifiableSolrParams paramsLoc = new ModifiableSolrParams();
paramsLoc.set(DISTRIB,"false"); // We are the aggregator.
- paramsLoc.set("numWorkers", workers);
- paramsLoc.set("workerID", w);
+ paramsLoc.set(NUM_WORKERS_PARAM, workers);
+ paramsLoc.set(WORKER_ID_PARAM, w);
+ paramsLoc.set(USE_HASH_QUERY_PARAM, false);
paramsLoc.set("expr", pushStream.toString());
paramsLoc.set("qt","/stream");
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/SolrStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/SolrStream.java
index e6210ee..58accf5 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/SolrStream.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/SolrStream.java
@@ -152,15 +152,17 @@ public class SolrStream extends TupleStream {
private ModifiableSolrParams loadParams(SolrParams paramsIn) throws IOException {
ModifiableSolrParams solrParams = new ModifiableSolrParams(paramsIn);
if (params.get("partitionKeys") != null) {
- if(!params.get("partitionKeys").equals("none") && numWorkers > 1) {
- String partitionFilter = getPartitionFilter();
- solrParams.add("fq", partitionFilter);
+ if (!params.get("partitionKeys").equals("none") && numWorkers > 1) {
+ if (params.getBool(ParallelStream.USE_HASH_QUERY_PARAM, false)) {
+ String partitionFilter = getPartitionFilter();
+ solrParams.add("fq", partitionFilter);
+ }
}
} else if(numWorkers > 1) {
throw new IOException("When numWorkers > 1 partitionKeys must be set. Set partitionKeys=none to send the entire stream to each worker.");
}
- if(checkpoint > 0) {
+ if (checkpoint > 0) {
solrParams.add("fq", "{!frange cost=100 incl=false l="+checkpoint+"}_version_");
}