You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ab...@apache.org on 2020/06/18 00:35:41 UTC
[lucene-solr] 01/02: SOLR-14537: Write doc values in the writer
thread instead of the filler.
This is an automated email from the ASF dual-hosted git repository.
ab pushed a commit to branch jira/solr-14537
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git
commit cd6a6c6b3e39a149188dabdae2a10ae93d86b4ce
Author: Andrzej Bialecki <ab...@apache.org>
AuthorDate: Wed Jun 17 23:48:17 2020 +0200
SOLR-14537: Write doc values in the writer thread instead of the filler.
---
.../apache/solr/handler/export/ExportBuffers.java | 88 ++++------------------
.../apache/solr/handler/export/ExportWriter.java | 38 ++++------
.../solr/handler/export/ExportWriterStream.java | 31 +++++++-
.../org/apache/solr/handler/export/LongValue.java | 4 +-
.../solr/handler/export/StringFieldWriter.java | 4 +-
.../apache/solr/handler/export/StringValue.java | 11 ++-
6 files changed, 71 insertions(+), 105 deletions(-)
diff --git a/solr/core/src/java/org/apache/solr/handler/export/ExportBuffers.java b/solr/core/src/java/org/apache/solr/handler/export/ExportBuffers.java
index 57691fe..817e958 100644
--- a/solr/core/src/java/org/apache/solr/handler/export/ExportBuffers.java
+++ b/solr/core/src/java/org/apache/solr/handler/export/ExportBuffers.java
@@ -35,7 +35,6 @@ import com.codahale.metrics.Timer;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.search.Sort;
import org.apache.solr.common.IteratorWriter;
-import org.apache.solr.common.MapWriter;
import org.apache.solr.common.util.ExecutorUtil;
import org.apache.solr.common.util.SolrNamedThreadFactory;
import org.apache.solr.search.SolrIndexSearcher;
@@ -53,7 +52,6 @@ class ExportBuffers {
final Buffer bufferOne;
final Buffer bufferTwo;
- final SortDoc[] outDocs;
final List<LeafReaderContext> leaves;
final ExportWriter exportWriter;
final OutputStream os;
@@ -74,7 +72,7 @@ class ExportBuffers {
ExportBuffers(ExportWriter exportWriter, List<LeafReaderContext> leaves, SolrIndexSearcher searcher,
OutputStream os, IteratorWriter.ItemWriter rawWriter, Sort sort, int queueSize, int totalHits,
- Timer writeOutputBufferTimer, Timer fillerWaitTimer, Timer writerWaitTimer) {
+ Timer writeOutputBufferTimer, Timer fillerWaitTimer, Timer writerWaitTimer) throws IOException {
this.exportWriter = exportWriter;
this.leaves = leaves;
this.os = os;
@@ -90,23 +88,25 @@ class ExportBuffers {
this.writeOutputBufferTimer = writeOutputBufferTimer;
this.fillerWaitTimer = fillerWaitTimer;
this.writerWaitTimer = writerWaitTimer;
- this.outDocs = new SortDoc[queueSize];
- this.bufferOne = new Buffer();
- this.bufferTwo = new Buffer();
+ this.bufferOne = new Buffer(queueSize);
+ this.bufferTwo = new Buffer(queueSize);
this.totalHits = totalHits;
fillBuffer = bufferOne;
outputBuffer = bufferTwo;
+ SortDoc writerSortDoc = exportWriter.getSortDoc(searcher, sort.getSort());
+ bufferOne.initialize(writerSortDoc);
+ bufferTwo.initialize(writerSortDoc);
barrier = new CyclicBarrier(2, () -> swapBuffers());
filler = () -> {
try {
log.info("--- filler start " + Thread.currentThread());
- Buffer buffer = getFillBuffer();
SortDoc sortDoc = exportWriter.getSortDoc(searcher, sort.getSort());
+ Buffer buffer = getFillBuffer();
SortQueue queue = new SortQueue(queueSize, sortDoc);
long lastOutputCounter = 0;
for (int count = 0; count < totalHits; ) {
log.info("--- filler fillOutDocs in " + fillBuffer);
- exportWriter.fillOutDocs(leaves, sortDoc, queue, outDocs, buffer);
+ exportWriter.fillOutDocs(leaves, sortDoc, queue, buffer);
count += (buffer.outDocsIndex + 1);
log.info("--- filler count=" + count + ", exchange buffer from " + buffer);
Timer.Context timerContext = getFillerWaitTimer().time();
@@ -241,78 +241,22 @@ class ExportBuffers {
}
}
- public static final class Buffer implements MapWriter.EntryWriter {
+ public static final class Buffer {
static final int EMPTY = -1;
static final int NO_MORE_DOCS = -2;
int outDocsIndex = EMPTY;
- // use array-of-arrays instead of Map to conserve space
- Object[][] outDocs;
- int pos = EMPTY;
+ SortDoc[] outDocs;
- MapWriter.EntryWriter getEntryWriter(int pos, int numFields) {
- if (outDocs == null) {
- outDocs = new Object[outDocsIndex + 1][];
- }
- this.pos = pos;
- Object[] fields = new Object[numFields << 1];
- outDocs[pos] = fields;
- return this;
+ public Buffer(int size) {
+ outDocs = new SortDoc[size];
}
- @Override
- public MapWriter.EntryWriter put(CharSequence k, Object v) throws IOException {
- if (pos < 0) {
- throw new IOException("Invalid entry position");
- }
- Object[] fields = outDocs[pos];
- boolean putOk = false;
- for (int i = 0; i < fields.length; i += 2) {
- if (fields[i] == null || fields[i].equals(k)) {
- fields[i] = k;
- // convert everything complex into POJOs at this point
- // to avoid accessing docValues or termEnums from other threads
- if (v instanceof IteratorWriter) {
- List lst = new ArrayList();
- ((IteratorWriter)v).toList(lst);
- v = lst;
- } else if (v instanceof MapWriter) {
- Map<String, Object> map = new HashMap<>();
- ((MapWriter)v).toMap(map);
- v = map;
- }
- fields[i + 1] = v;
- putOk = true;
- break;
- }
- }
- if (!putOk) {
- throw new IOException("should not happen! pos=" + pos + " ran out of space for field " + k + "=" + v
- + " - already full: " + Arrays.toString(fields));
- }
- return this;
- }
-
- // helper method to make it easier to write our internal key-value array as if it were a map
- public void writeItem(int pos, IteratorWriter.ItemWriter itemWriter) throws IOException {
- final Object[] fields = outDocs[pos];
- if (fields == null) {
- return;
- }
- itemWriter.add((MapWriter) ew -> {
- for (int i = 0; i < fields.length; i += 2) {
- if (fields[i] == null) {
- continue;
- }
- ew.put((CharSequence)fields[i], fields[i + 1]);
- }
- });
- }
-
- public void reset() {
+ public void initialize(SortDoc proto) {
outDocsIndex = EMPTY;
- pos = EMPTY;
- outDocs = null;
+ for (int i = 0; i < outDocs.length; i++) {
+ outDocs[i] = proto.copy();
+ }
}
@Override
diff --git a/solr/core/src/java/org/apache/solr/handler/export/ExportWriter.java b/solr/core/src/java/org/apache/solr/handler/export/ExportWriter.java
index 26bc841..7dbeac2 100644
--- a/solr/core/src/java/org/apache/solr/handler/export/ExportWriter.java
+++ b/solr/core/src/java/org/apache/solr/handler/export/ExportWriter.java
@@ -321,21 +321,25 @@ public class ExportWriter implements SolrCore.RawWriter, Closeable {
}
}
- protected void transferBatchToBufferForOutput(SortQueue queue, SortDoc[] outDocs,
- List<LeafReaderContext> leaves, ExportBuffers.Buffer destination) throws IOException {
+ protected void transferBatchToBufferForOutput(SortQueue queue,
+ List<LeafReaderContext> leaves,
+ ExportBuffers.Buffer destination) throws IOException {
Timer.Context timerContext = transferBatchToBufferTimer.time();
try {
int outDocsIndex = -1;
for (int i = 0; i < queue.maxSize; i++) {
SortDoc s = queue.pop();
if (s.docId > -1) {
- outDocs[++outDocsIndex] = s;
+ destination.outDocs[++outDocsIndex].setValues(s);
// remove this doc id from the matching bitset, it's been exported
sets[s.ord].clear(s.docId);
+ s.reset(); // reuse
}
}
destination.outDocsIndex = outDocsIndex;
- materializeDocs(leaves, outDocs, destination);
+ } catch (Throwable t) {
+ log.error("transfer", t);
+ throw t;
} finally {
timerContext.stop();
}
@@ -398,7 +402,8 @@ public class ExportWriter implements SolrCore.RawWriter, Closeable {
// we're using the raw writer here because there's no potential
// reduction in the number of output items, unlike when using
// streaming expressions
- buffer.writeItem(i, writer);
+ final SortDoc currentDoc = buffer.outDocs[i];
+ writer.add((MapWriter) ew -> writeDoc(currentDoc, leaves, ew, fieldWriters));
}
} finally {
timerContext.stop();
@@ -419,38 +424,25 @@ public class ExportWriter implements SolrCore.RawWriter, Closeable {
}
void fillOutDocs(List<LeafReaderContext> leaves, SortDoc sortDoc,
- SortQueue sortQueue, SortDoc[] outDocs, ExportBuffers.Buffer buffer) throws IOException {
+ SortQueue sortQueue, ExportBuffers.Buffer buffer) throws IOException {
identifyLowestSortingUnexportedDocs(leaves, sortDoc, sortQueue);
- transferBatchToBufferForOutput(sortQueue, outDocs, leaves, buffer);
- }
-
- private void materializeDocs(List<LeafReaderContext> leaves, SortDoc[] outDocs, ExportBuffers.Buffer buffer) throws IOException {
- log.info("--- materialize docs in " + buffer);
- if (buffer.outDocsIndex < 0) {
- return;
- }
- for (int i = buffer.outDocsIndex; i >= 0; i--) {
- SortDoc sortDoc = outDocs[i];
- EntryWriter ew = buffer.getEntryWriter(i, fieldWriters.length);
- writeDoc(sortDoc, leaves, ew);
- sortDoc.reset();
- }
+ transferBatchToBufferForOutput(sortQueue, leaves, buffer);
}
void writeDoc(SortDoc sortDoc,
List<LeafReaderContext> leaves,
- EntryWriter ew) throws IOException {
+ EntryWriter ew, FieldWriter[] writers) throws IOException {
int ord = sortDoc.ord;
LeafReaderContext context = leaves.get(ord);
int fieldIndex = 0;
- for (FieldWriter fieldWriter : fieldWriters) {
+ for (FieldWriter fieldWriter : writers) {
if (fieldWriter.write(sortDoc, context.reader(), ew, fieldIndex)) {
++fieldIndex;
}
}
}
- protected FieldWriter[] getFieldWriters(String[] fields, SolrIndexSearcher searcher) throws IOException {
+ public FieldWriter[] getFieldWriters(String[] fields, SolrIndexSearcher searcher) throws IOException {
IndexSchema schema = searcher.getSchema();
FieldWriter[] writers = new FieldWriter[fields.length];
for (int i = 0; i < fields.length; i++) {
diff --git a/solr/core/src/java/org/apache/solr/handler/export/ExportWriterStream.java b/solr/core/src/java/org/apache/solr/handler/export/ExportWriterStream.java
index 1ff8c84..bbdce58 100644
--- a/solr/core/src/java/org/apache/solr/handler/export/ExportWriterStream.java
+++ b/solr/core/src/java/org/apache/solr/handler/export/ExportWriterStream.java
@@ -18,7 +18,10 @@ package org.apache.solr.handler.export;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.TimeoutException;
@@ -36,6 +39,8 @@ import org.apache.solr.client.solrj.io.stream.expr.StreamExplanation;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpression;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionParameter;
import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
+import org.apache.solr.common.IteratorWriter;
+import org.apache.solr.common.MapWriter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -47,6 +52,7 @@ import org.slf4j.LoggerFactory;
*/
public class ExportWriterStream extends TupleStream implements Expressible {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+ final TupleEntryWriter tupleEntryWriter = new TupleEntryWriter();
StreamContext context;
StreamComparator streamComparator;
int pos = -1;
@@ -54,6 +60,25 @@ public class ExportWriterStream extends TupleStream implements Expressible {
ExportBuffers.Buffer buffer;
Timer.Context writeOutputTimerContext;
+ private static final class TupleEntryWriter implements EntryWriter {
+ Tuple tuple;
+
+ @Override
+ public EntryWriter put(CharSequence k, Object v) throws IOException {
+ if (v instanceof IteratorWriter) {
+ List lst = new ArrayList();
+ ((IteratorWriter)v).toList(lst);
+ v = lst;
+ } else if (v instanceof MapWriter) {
+ Map<String, Object> map = new HashMap<>();
+ ((MapWriter)v).toMap(map);
+ v = map;
+ }
+ tuple.put(k.toString(), v);
+ return this;
+ }
+ }
+
public ExportWriterStream(StreamExpression expression, StreamFactory factory) throws IOException {
streamComparator = parseComp(factory.getDefaultSort());
}
@@ -188,9 +213,11 @@ public class ExportWriterStream extends TupleStream implements Expressible {
if (writeOutputTimerContext == null) {
writeOutputTimerContext = exportBuffers.getWriteOutputBufferTimer().time();
}
- res = new Tuple(buffer.outDocs[pos]);
+ SortDoc sortDoc = buffer.outDocs[pos];
+ tupleEntryWriter.tuple = new Tuple();
+ exportBuffers.exportWriter.writeDoc(sortDoc, exportBuffers.leaves, tupleEntryWriter, exportBuffers.exportWriter.fieldWriters);
pos--;
- return res;
+ return tupleEntryWriter.tuple;
}
@Override
diff --git a/solr/core/src/java/org/apache/solr/handler/export/LongValue.java b/solr/core/src/java/org/apache/solr/handler/export/LongValue.java
index dce365b..63794ad 100644
--- a/solr/core/src/java/org/apache/solr/handler/export/LongValue.java
+++ b/solr/core/src/java/org/apache/solr/handler/export/LongValue.java
@@ -25,10 +25,10 @@ import org.apache.lucene.index.NumericDocValues;
public class LongValue implements SortValue {
+ final protected String field;
+ final protected LongComp comp;
protected NumericDocValues vals;
- protected String field;
protected long currentValue;
- protected LongComp comp;
private int lastDocID;
private boolean present;
diff --git a/solr/core/src/java/org/apache/solr/handler/export/StringFieldWriter.java b/solr/core/src/java/org/apache/solr/handler/export/StringFieldWriter.java
index b82c365..7fb24ae 100644
--- a/solr/core/src/java/org/apache/solr/handler/export/StringFieldWriter.java
+++ b/solr/core/src/java/org/apache/solr/handler/export/StringFieldWriter.java
@@ -73,9 +73,9 @@ class StringFieldWriter extends FieldWriter {
ew.put(this.field, utf8.reset(ref.bytes, ref.offset, ref.length, null));
} else {
String v = null;
- if(sortValue != null) {
+ if (sortValue != null) {
v = ((StringValue) sortValue).getLastString();
- if(v == null) {
+ if (v == null) {
fieldType.indexedToReadable(ref, cref);
v = cref.toString();
((StringValue) sortValue).setLastString(v);
diff --git a/solr/core/src/java/org/apache/solr/handler/export/StringValue.java b/solr/core/src/java/org/apache/solr/handler/export/StringValue.java
index fc70565..2ea25d5 100644
--- a/solr/core/src/java/org/apache/solr/handler/export/StringValue.java
+++ b/solr/core/src/java/org/apache/solr/handler/export/StringValue.java
@@ -31,13 +31,13 @@ class StringValue implements SortValue {
protected SortedDocValues globalDocValues;
- protected OrdinalMap ordinalMap;
+ final protected OrdinalMap ordinalMap;
protected LongValues toGlobal = LongValues.IDENTITY; // this segment to global ordinal. NN;
protected SortedDocValues docValues;
- protected String field;
+ final protected String field;
protected int currentOrd;
- protected IntComp comp;
+ final protected IntComp comp;
protected int lastDocID;
private boolean present;
@@ -50,6 +50,8 @@ class StringValue implements SortValue {
this.docValues = globalDocValues;
if (globalDocValues instanceof MultiDocValues.MultiSortedDocValues) {
this.ordinalMap = ((MultiDocValues.MultiSortedDocValues) globalDocValues).mapping;
+ } else {
+ this.ordinalMap = null;
}
this.field = field;
this.comp = comp;
@@ -66,7 +68,8 @@ class StringValue implements SortValue {
}
public StringValue copy() {
- return new StringValue(globalDocValues, field, comp);
+ StringValue copy = new StringValue(globalDocValues, field, comp);
+ return copy;
}
public void setCurrentValue(int docId) throws IOException {