You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ab...@apache.org on 2020/06/10 17:30:06 UTC
[lucene-solr] 01/01: SOLR-14537: Use double buffering to increase
throughput.
This is an automated email from the ASF dual-hosted git repository.
ab pushed a commit to branch jira/solr-14537
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git
commit 2dba525ae6b37e826eb4a08827ed0e109e88784b
Author: Andrzej Bialecki <ab...@apache.org>
AuthorDate: Wed Jun 10 19:29:28 2020 +0200
SOLR-14537: Use double buffering to increase throughput.
---
.../apache/solr/handler/export/ExportWriter.java | 367 +++++++++++++++------
.../solr/handler/export/SingleValueSortDoc.java | 5 -
.../org/apache/solr/handler/export/SortDoc.java | 4 -
.../org/apache/solr/client/solrj/io/Tuple.java | 4 +
4 files changed, 274 insertions(+), 106 deletions(-)
diff --git a/solr/core/src/java/org/apache/solr/handler/export/ExportWriter.java b/solr/core/src/java/org/apache/solr/handler/export/ExportWriter.java
index 3eccd0d..813db92 100644
--- a/solr/core/src/java/org/apache/solr/handler/export/ExportWriter.java
+++ b/solr/core/src/java/org/apache/solr/handler/export/ExportWriter.java
@@ -24,7 +24,17 @@ import java.io.OutputStreamWriter;
import java.io.PrintWriter;
import java.lang.invoke.MethodHandles;
import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Exchanger;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import org.apache.lucene.index.LeafReader;
import org.apache.lucene.index.LeafReaderContext;
@@ -58,7 +68,9 @@ import org.apache.solr.common.SolrException;
import org.apache.solr.common.params.CommonParams;
import org.apache.solr.common.params.SolrParams;
import org.apache.solr.common.params.StreamParams;
+import org.apache.solr.common.util.ExecutorUtil;
import org.apache.solr.common.util.JavaBinCodec;
+import org.apache.solr.common.util.SolrNamedThreadFactory;
import org.apache.solr.core.SolrCore;
import org.apache.solr.request.SolrQueryRequest;
import org.apache.solr.request.SolrRequestInfo;
@@ -106,11 +118,8 @@ public class ExportWriter implements SolrCore.RawWriter, Closeable {
private static final int DOCUMENT_BATCH_SIZE = 30000;
private static final String EXPORT_WRITER_KEY = "__ew__";
- private static final String SORT_DOCS_KEY = "_ew_docs_";
- private static final String TOTAL_HITS_KEY = "_ew_totalHits_";
- private static final String LEAF_READERS_KEY = "_ew_leaves_";
- private static final String SORT_QUEUE_KEY = "_ew_queue_";
- private static final String SORT_DOC_KEY = "_ew_sort_";
+ private static final String EXPORT_BUFFERS_KEY = "__eb__";
+ private static final String LEAF_READERS_KEY = "__leaves__";
private OutputStreamWriter respWriter;
final SolrQueryRequest req;
@@ -124,33 +133,12 @@ public class ExportWriter implements SolrCore.RawWriter, Closeable {
PushWriter writer;
private String wt;
- private static class TupleEntryWriter implements EntryWriter {
- Tuple tuple;
-
- void setTuple(Tuple tuple) {
- this.tuple = tuple;
- }
-
- @Override
- public EntryWriter put(CharSequence k, Object v) throws IOException {
- tuple.put(k, v);
- return this;
- }
- }
-
public static class ExportWriterStream extends TupleStream implements Expressible {
StreamContext context;
StreamComparator streamComparator;
int pos = -1;
- int outDocIndex = -1;
- int count;
- SortDoc sortDoc;
- SortQueue queue;
- SortDoc[] docs;
- int totalHits;
- ExportWriter exportWriter;
- List<LeafReaderContext> leaves;
- final TupleEntryWriter entryWriter = new TupleEntryWriter();
+ ExportBuffers exportBuffers;
+ Buffer buffer;
public ExportWriterStream(StreamExpression expression, StreamFactory factory) throws IOException {
streamComparator = parseComp(factory.getDefaultSort());
@@ -170,7 +158,7 @@ public class ExportWriter implements SolrCore.RawWriter, Closeable {
String[] sorts = sort.split(",");
StreamComparator[] comps = new StreamComparator[sorts.length];
- for(int i=0; i<sorts.length; i++) {
+ for (int i = 0; i < sorts.length; i++) {
String s = sorts[i];
String[] spec = s.trim().split("\\s+"); //This should take into account spaces in the sort spec.
@@ -185,7 +173,7 @@ public class ExportWriter implements SolrCore.RawWriter, Closeable {
comps[i] = new FieldComparator(fieldName, order.equalsIgnoreCase("asc") ? ComparatorOrder.ASCENDING : ComparatorOrder.DESCENDING);
}
- if(comps.length > 1) {
+ if (comps.length > 1) {
return new MultipleFieldComparator(comps);
} else {
return comps[0];
@@ -194,40 +182,41 @@ public class ExportWriter implements SolrCore.RawWriter, Closeable {
@Override
public void open() throws IOException {
- docs = (SortDoc[]) context.get(SORT_DOCS_KEY);
- queue = (SortQueue) context.get(SORT_QUEUE_KEY);
- sortDoc = (SortDoc) context.get(SORT_DOC_KEY);
- totalHits = (Integer) context.get(TOTAL_HITS_KEY);
- exportWriter = (ExportWriter) context.get(EXPORT_WRITER_KEY);
- leaves = (List<LeafReaderContext>) context.get(LEAF_READERS_KEY);
- count = 0;
+ exportBuffers = (ExportBuffers) context.get(EXPORT_BUFFERS_KEY);
+ buffer = exportBuffers.getOutputBuffer();
}
@Override
public void close() throws IOException {
- exportWriter = null;
- leaves = null;
+ exportBuffers = null;
}
@Override
public Tuple read() throws IOException {
if (pos < 0) {
- if (count < totalHits) {
- outDocIndex = exportWriter.fillOutDocs(leaves, sortDoc, queue, docs);
- count += (outDocIndex + 1);
- pos = outDocIndex;
- } else {
+ try {
+ buffer.outDocsIndex = Buffer.EMPTY;
+ log.info("--- ews exchange empty buffer " + buffer);
+ buffer = exportBuffers.exchanger.exchange(buffer);
+ log.info("--- ews got new output buffer " + buffer);
+ } catch (InterruptedException e) {
+ log.info("--- ews interrupt");
+ Thread.currentThread().interrupt();
+ throw new IOException("interrupted");
+ }
+ if (buffer.outDocsIndex == Buffer.NO_MORE_DOCS) {
+ log.info("--- ews EOF");
return Tuple.EOF();
+ } else {
+ pos = buffer.outDocsIndex;
+ log.info("--- ews new pos=" + pos);
}
}
if (pos < 0) {
+ log.info("--- ews EOF?");
return Tuple.EOF();
}
- Tuple tuple = new Tuple();
- entryWriter.setTuple(tuple);
- SortDoc s = docs[pos];
- exportWriter.writeDoc(s, leaves, entryWriter);
- s.reset();
+ Tuple tuple = new Tuple(buffer.outDocs[pos]);
pos--;
return tuple;
}
@@ -442,41 +431,197 @@ public class ExportWriter implements SolrCore.RawWriter, Closeable {
}
}
- protected int transferBatchToArrayForOutput(SortQueue queue, SortDoc[] destinationArr) {
+ protected void transferBatchToBufferForOutput(SortQueue queue, SortDoc[] outDocs,
+ List<LeafReaderContext> leaves, Buffer destination) throws IOException {
int outDocsIndex = -1;
for (int i = 0; i < queue.maxSize; i++) {
SortDoc s = queue.pop();
if (s.docId > -1) {
- destinationArr[++outDocsIndex] = s;
+ outDocs[++outDocsIndex] = s;
}
}
-
- return outDocsIndex;
+ destination.outDocsIndex = outDocsIndex;
+ materializeDocs(leaves, outDocs, destination);
}
- protected void addDocsToItemWriter(List<LeafReaderContext> leaves, IteratorWriter.ItemWriter writer, SortDoc[] docsToExport, int outDocsIndex) throws IOException {
- try {
- for (int i = outDocsIndex; i >= 0; --i) {
- SortDoc s = docsToExport[i];
- writer.add((MapWriter) ew -> {
- writeDoc(s, leaves, ew);
- s.reset();
- });
+ private static final class Buffer implements EntryWriter {
+ static final int EMPTY = -1;
+ static final int NO_MORE_DOCS = -2;
+
+ int outDocsIndex = EMPTY;
+ // use array-of-arrays instead of Map to conserve space
+ Object[][] outDocs;
+ int pos = EMPTY;
+
+ EntryWriter getEntryWriter(int pos, int numFields) {
+ if (outDocs == null) {
+ outDocs = new Object[outDocsIndex + 1][];
+ }
+ this.pos = pos;
+ Object[] fields = new Object[numFields << 1];
+ outDocs[pos] = fields;
+ return this;
+ }
+
+ @Override
+ public EntryWriter put(CharSequence k, Object v) throws IOException {
+ if (pos < 0) {
+ throw new IOException("Invalid entry position");
}
- } catch (Throwable e) {
- Throwable ex = e;
- while (ex != null) {
- String m = ex.getMessage();
- if (m != null && m.contains("Broken pipe")) {
- throw new IgnoreException();
+ Object[] fields = outDocs[pos];
+ boolean putOk = false;
+ for (int i = 0; i < fields.length; i += 2) {
+ if (fields[i] == null || fields[i].equals(k)) {
+ fields[i] = k;
+ // convert everything complex into POJOs at this point
+ if (v instanceof IteratorWriter) {
+ List lst = new ArrayList();
+ ((IteratorWriter)v).toList(lst);
+ v = lst;
+ } else if (v instanceof MapWriter) {
+ Map<String, Object> map = new HashMap<>();
+ ((MapWriter)v).toMap(map);
+ v = map;
+ }
+ fields[i + 1] = v;
+ putOk = true;
+ break;
}
- ex = ex.getCause();
}
+ if (!putOk) {
+ throw new IOException("should not happen! pos=" + pos + " ran out of space for field " + k + "=" + v
+ + " - already full: " + Arrays.toString(fields));
+ }
+ return this;
+ }
- if (e instanceof IOException) {
- throw ((IOException) e);
- } else {
- throw new IOException(e);
+ public void writeItem(int pos, IteratorWriter.ItemWriter itemWriter) throws IOException {
+ final Object[] fields = outDocs[pos];
+ if (fields == null) {
+ log.info("--- writeItem no fields at " + pos);
+ return;
+ }
+ itemWriter.add((MapWriter) ew -> {
+ for (int i = 0; i < fields.length; i += 2) {
+ if (fields[i] == null) {
+ log.info("--- writeItem empty field at " + pos + "/" + i);
+ continue;
+ }
+ log.info("--- writeItem field " + pos + "/" + i);
+ ew.put((CharSequence)fields[i], fields[i + 1]);
+ log.info("--- writeItem done field " + pos + "/" + i);
+ }
+ });
+ }
+
+ public void reset() {
+ outDocsIndex = EMPTY;
+ pos = EMPTY;
+ outDocs = null;
+ }
+
+ @Override
+ public String toString() {
+ return "Buffer@" + Integer.toHexString(hashCode()) + "{" +
+ "outDocsIndex=" + outDocsIndex +
+ '}';
+ }
+ }
+
+ /* Helper class implementing a "double buffering" producer / consumer. */
+ private static final class ExportBuffers {
+ static final long EXCHANGE_TIMEOUT_SECONDS = 300;
+
+ final Buffer bufferOne;
+ final Buffer bufferTwo;
+ final Exchanger<Buffer> exchanger = new Exchanger<>();
+ final SortDoc[] outDocs;
+ final List<LeafReaderContext> leaves;
+ final ExportWriter exportWriter;
+ final int totalHits;
+ Buffer fillBuffer;
+ Buffer outputBuffer;
+ Runnable filler;
+ ExecutorService service;
+
+ ExportBuffers (ExportWriter exportWriter, List<LeafReaderContext> leaves, SolrIndexSearcher searcher, Sort sort, int queueSize, int totalHits) {
+ this.exportWriter = exportWriter;
+ this.leaves = leaves;
+ this.outDocs = new SortDoc[queueSize];
+ this.bufferOne = new Buffer();
+ this.bufferTwo = new Buffer();
+ this.totalHits = totalHits;
+ fillBuffer = bufferOne;
+ outputBuffer = bufferTwo;
+ filler = () -> {
+ try {
+ SortDoc sortDoc = exportWriter.getSortDoc(searcher, sort.getSort());
+ SortQueue queue = new SortQueue(queueSize, sortDoc);
+ for (int count = 0; count < totalHits; ) {
+ log.info("--- filler fillOutDocs in " + fillBuffer);
+ exportWriter.fillOutDocs(leaves, sortDoc, queue, outDocs, fillBuffer);
+ count += (fillBuffer.outDocsIndex + 1);
+ log.info("--- filler count=" + count + ", exchange buffer from " + fillBuffer);
+ fillBuffer = exchanger.exchange(fillBuffer, EXCHANGE_TIMEOUT_SECONDS, TimeUnit.SECONDS);
+ log.info("--- filler got empty buffer " + fillBuffer);
+ }
+ fillBuffer.outDocsIndex = Buffer.NO_MORE_DOCS;
+ log.info("--- filler final exchange buffer from " + fillBuffer);
+ fillBuffer = exchanger.exchange(fillBuffer, EXCHANGE_TIMEOUT_SECONDS, TimeUnit.SECONDS);
+ log.info("--- filler final got buffer " + fillBuffer);
+ } catch (Exception e) {
+ log.error("filler", e);
+ shutdownNow();
+ }
+ };
+ }
+
+ public Buffer getOutputBuffer() {
+ return outputBuffer;
+ }
+
+ public Buffer exchange(Buffer buffer) throws InterruptedException, TimeoutException {
+ return exchanger.exchange(buffer, EXCHANGE_TIMEOUT_SECONDS, TimeUnit.SECONDS);
+ }
+
+ public void shutdownNow() {
+ if (service != null) {
+ log.info("--- shutting down buffers");
+ service.shutdownNow();
+ }
+ }
+
+ /**
+ * Start processing and block until complete or Exception is thrown.
+ * @param writer writer that exchanges and processes buffers received from a producer.
+ * @throws IOException on errors
+ */
+ public void run(Callable<Boolean> writer) throws IOException {
+ service = ExecutorUtil.newMDCAwareFixedThreadPool(2, new SolrNamedThreadFactory("ExportBuffers"));
+ try {
+ CompletableFuture.runAsync(filler, service);
+ writer.call();
+
+ // alternatively we could run the writer in a separate thread too:
+// CompletableFuture<Void> allDone = CompletableFuture.allOf(
+// CompletableFuture.runAsync(filler, service),
+// CompletableFuture.runAsync(() -> {
+// try {
+// writer.call();
+// } catch (Exception e) {
+// log.error("writer", e);
+// shutdownNow();
+// }
+// }, service)
+// );
+// allDone.join();
+ log.info("-- finished.");
+ } catch (Exception e) {
+ log.error("Exception running filler / writer", e);
+ //
+ } finally {
+ log.info("--- all done, shutting down buffers");
+ service.shutdownNow();
}
}
}
@@ -486,48 +631,76 @@ public class ExportWriter implements SolrCore.RawWriter, Closeable {
SortDoc sortDoc = getSortDoc(req.getSearcher(), sort.getSort());
final int queueSize = Math.min(DOCUMENT_BATCH_SIZE, totalHits);
- SortQueue queue = new SortQueue(queueSize, sortDoc);
- SortDoc[] outDocs = new SortDoc[queueSize];
+ ExportBuffers buffers = new ExportBuffers(this, leaves, req.getSearcher(), sort, queueSize, totalHits);
if (streamExpression != null) {
- streamContext.put(SORT_DOCS_KEY, outDocs);
- streamContext.put(SORT_QUEUE_KEY, queue);
- streamContext.put(SORT_DOC_KEY, sortDoc);
- streamContext.put(TOTAL_HITS_KEY, totalHits);
streamContext.put(EXPORT_WRITER_KEY, this);
+ streamContext.put(EXPORT_BUFFERS_KEY, buffers);
streamContext.put(LEAF_READERS_KEY, leaves);
- TupleStream tupleStream = createTupleStream();
+ final TupleStream tupleStream = createTupleStream();
tupleStream.open();
- for (;;) {
- final Tuple t = tupleStream.read();
- if (t == null) {
- break;
- }
- if (t.EOF) {
- break;
+ buffers.run(() -> {
+ for (;;) {
+ if (Thread.currentThread().isInterrupted()) {
+ break;
+ }
+ final Tuple t = tupleStream.read();
+ if (t == null) {
+ break;
+ }
+ if (t.EOF) {
+ break;
+ }
+ writer.add((MapWriter) ew -> t.writeMap(ew));
}
- writer.add((MapWriter) ew -> t.writeMap(ew));
- }
+ return true;
+ });
tupleStream.close();
} else {
- for (int count = 0; count < totalHits; ) {
- int outDocsIndex = fillOutDocs(leaves, sortDoc, queue, outDocs);
- count += (outDocsIndex + 1);
- addDocsToItemWriter(leaves, writer, outDocs, outDocsIndex);
- }
+ buffers.run(() -> {
+ Buffer buffer = buffers.getOutputBuffer();
+ log.info("--- writer init exchanging from " + buffer);
+ buffer = buffers.exchange(buffer);
+ log.info("--- writer init got " + buffer);
+ while (buffer.outDocsIndex != Buffer.NO_MORE_DOCS) {
+ if (Thread.currentThread().isInterrupted()) {
+ log.info("--- writer interrupted");
+ break;
+ }
+ for (int i = buffer.outDocsIndex; i >= 0; --i) {
+ buffer.writeItem(i, writer);
+ }
+ log.info("--- writer exchanging from " + buffer);
+ buffer = buffers.exchange(buffer);
+ log.info("--- writer got " + buffer);
+ }
+ return true;
+ });
}
}
- private int fillOutDocs(List<LeafReaderContext> leaves, SortDoc sortDoc,
- SortQueue sortQueue, SortDoc[] outDocs) throws IOException {
+ private void fillOutDocs(List<LeafReaderContext> leaves, SortDoc sortDoc,
+ SortQueue sortQueue, SortDoc[] outDocs, Buffer buffer) throws IOException {
identifyLowestSortingUnexportedDocs(leaves, sortDoc, sortQueue);
- return transferBatchToArrayForOutput(sortQueue, outDocs);
+ transferBatchToBufferForOutput(sortQueue, outDocs, leaves, buffer);
+ }
+
+ private void materializeDocs(List<LeafReaderContext> leaves, SortDoc[] outDocs, Buffer buffer) throws IOException {
+ log.info("--- materialize docs in " + buffer);
+ if (buffer.outDocsIndex < 0) {
+ return;
+ }
+ for (int i = buffer.outDocsIndex; i >= 0; i--) {
+ SortDoc sortDoc = outDocs[i];
+ EntryWriter ew = buffer.getEntryWriter(i, fieldWriters.length);
+ writeDoc(sortDoc, leaves, ew);
+ sortDoc.reset();
+ }
}
void writeDoc(SortDoc sortDoc,
List<LeafReaderContext> leaves,
EntryWriter ew) throws IOException {
-
int ord = sortDoc.ord;
FixedBitSet set = sets[ord];
set.clear(sortDoc.docId);
diff --git a/solr/core/src/java/org/apache/solr/handler/export/SingleValueSortDoc.java b/solr/core/src/java/org/apache/solr/handler/export/SingleValueSortDoc.java
index 164c07b..963901c 100644
--- a/solr/core/src/java/org/apache/solr/handler/export/SingleValueSortDoc.java
+++ b/solr/core/src/java/org/apache/solr/handler/export/SingleValueSortDoc.java
@@ -32,11 +32,6 @@ class SingleValueSortDoc extends SortDoc {
return null;
}
- @Override
- public SortValue[] getSortValues() {
- return new SortValue[] { value1 };
- }
-
public void setNextReader(LeafReaderContext context) throws IOException {
this.ord = context.ord;
this.docBase = context.docBase;
diff --git a/solr/core/src/java/org/apache/solr/handler/export/SortDoc.java b/solr/core/src/java/org/apache/solr/handler/export/SortDoc.java
index 292e795..5e2c75d 100644
--- a/solr/core/src/java/org/apache/solr/handler/export/SortDoc.java
+++ b/solr/core/src/java/org/apache/solr/handler/export/SortDoc.java
@@ -45,10 +45,6 @@ class SortDoc {
return null;
}
- public SortValue[] getSortValues() {
- return sortValues;
- }
-
public void setNextReader(LeafReaderContext context) throws IOException {
this.ord = context.ord;
this.docBase = context.docBase;
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/Tuple.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/Tuple.java
index 2bdb2aa..3b58e86 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/Tuple.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/Tuple.java
@@ -91,6 +91,10 @@ public class Tuple implements Cloneable, MapWriter {
throw new RuntimeException("must have a matching number of key-value pairs");
}
for (int i = 0; i < fields.length; i += 2) {
+ // skip empty entries
+ if (fields[i] == null) {
+ continue;
+ }
put(fields[i], fields[i + 1]);
}
}