You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ab...@apache.org on 2020/06/18 00:35:41 UTC

[lucene-solr] 01/02: SOLR-14537: Write doc values in the writer thread instead of the filler.

This is an automated email from the ASF dual-hosted git repository.

ab pushed a commit to branch jira/solr-14537
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git

commit cd6a6c6b3e39a149188dabdae2a10ae93d86b4ce
Author: Andrzej Bialecki <ab...@apache.org>
AuthorDate: Wed Jun 17 23:48:17 2020 +0200

    SOLR-14537: Write doc values in the writer thread instead of the filler.
---
 .../apache/solr/handler/export/ExportBuffers.java  | 88 ++++------------------
 .../apache/solr/handler/export/ExportWriter.java   | 38 ++++------
 .../solr/handler/export/ExportWriterStream.java    | 31 +++++++-
 .../org/apache/solr/handler/export/LongValue.java  |  4 +-
 .../solr/handler/export/StringFieldWriter.java     |  4 +-
 .../apache/solr/handler/export/StringValue.java    | 11 ++-
 6 files changed, 71 insertions(+), 105 deletions(-)

diff --git a/solr/core/src/java/org/apache/solr/handler/export/ExportBuffers.java b/solr/core/src/java/org/apache/solr/handler/export/ExportBuffers.java
index 57691fe..817e958 100644
--- a/solr/core/src/java/org/apache/solr/handler/export/ExportBuffers.java
+++ b/solr/core/src/java/org/apache/solr/handler/export/ExportBuffers.java
@@ -35,7 +35,6 @@ import com.codahale.metrics.Timer;
 import org.apache.lucene.index.LeafReaderContext;
 import org.apache.lucene.search.Sort;
 import org.apache.solr.common.IteratorWriter;
-import org.apache.solr.common.MapWriter;
 import org.apache.solr.common.util.ExecutorUtil;
 import org.apache.solr.common.util.SolrNamedThreadFactory;
 import org.apache.solr.search.SolrIndexSearcher;
@@ -53,7 +52,6 @@ class ExportBuffers {
 
   final Buffer bufferOne;
   final Buffer bufferTwo;
-  final SortDoc[] outDocs;
   final List<LeafReaderContext> leaves;
   final ExportWriter exportWriter;
   final OutputStream os;
@@ -74,7 +72,7 @@ class ExportBuffers {
 
   ExportBuffers(ExportWriter exportWriter, List<LeafReaderContext> leaves, SolrIndexSearcher searcher,
                 OutputStream os, IteratorWriter.ItemWriter rawWriter, Sort sort, int queueSize, int totalHits,
-                Timer writeOutputBufferTimer, Timer fillerWaitTimer, Timer writerWaitTimer) {
+                Timer writeOutputBufferTimer, Timer fillerWaitTimer, Timer writerWaitTimer) throws IOException {
     this.exportWriter = exportWriter;
     this.leaves = leaves;
     this.os = os;
@@ -90,23 +88,25 @@ class ExportBuffers {
     this.writeOutputBufferTimer = writeOutputBufferTimer;
     this.fillerWaitTimer = fillerWaitTimer;
     this.writerWaitTimer = writerWaitTimer;
-    this.outDocs = new SortDoc[queueSize];
-    this.bufferOne = new Buffer();
-    this.bufferTwo = new Buffer();
+    this.bufferOne = new Buffer(queueSize);
+    this.bufferTwo = new Buffer(queueSize);
     this.totalHits = totalHits;
     fillBuffer = bufferOne;
     outputBuffer = bufferTwo;
+    SortDoc writerSortDoc = exportWriter.getSortDoc(searcher, sort.getSort());
+    bufferOne.initialize(writerSortDoc);
+    bufferTwo.initialize(writerSortDoc);
     barrier = new CyclicBarrier(2, () -> swapBuffers());
     filler = () -> {
       try {
         log.info("--- filler start " + Thread.currentThread());
-        Buffer buffer = getFillBuffer();
         SortDoc sortDoc = exportWriter.getSortDoc(searcher, sort.getSort());
+        Buffer buffer = getFillBuffer();
         SortQueue queue = new SortQueue(queueSize, sortDoc);
         long lastOutputCounter = 0;
         for (int count = 0; count < totalHits; ) {
           log.info("--- filler fillOutDocs in " + fillBuffer);
-          exportWriter.fillOutDocs(leaves, sortDoc, queue, outDocs, buffer);
+          exportWriter.fillOutDocs(leaves, sortDoc, queue, buffer);
           count += (buffer.outDocsIndex + 1);
           log.info("--- filler count=" + count + ", exchange buffer from " + buffer);
           Timer.Context timerContext = getFillerWaitTimer().time();
@@ -241,78 +241,22 @@ class ExportBuffers {
     }
   }
 
-  public static final class Buffer implements MapWriter.EntryWriter {
+  public static final class Buffer {
     static final int EMPTY = -1;
     static final int NO_MORE_DOCS = -2;
 
     int outDocsIndex = EMPTY;
-    // use array-of-arrays instead of Map to conserve space
-    Object[][] outDocs;
-    int pos = EMPTY;
+    SortDoc[] outDocs;
 
-    MapWriter.EntryWriter getEntryWriter(int pos, int numFields) {
-      if (outDocs == null) {
-        outDocs = new Object[outDocsIndex + 1][];
-      }
-      this.pos = pos;
-      Object[] fields = new Object[numFields << 1];
-      outDocs[pos] = fields;
-      return this;
+    public Buffer(int size) {
+      outDocs = new SortDoc[size];
     }
 
-    @Override
-    public MapWriter.EntryWriter put(CharSequence k, Object v) throws IOException {
-      if (pos < 0) {
-        throw new IOException("Invalid entry position");
-      }
-      Object[] fields = outDocs[pos];
-      boolean putOk = false;
-      for (int i = 0; i < fields.length; i += 2) {
-        if (fields[i] == null || fields[i].equals(k)) {
-          fields[i] = k;
-          // convert everything complex into POJOs at this point
-          // to avoid accessing docValues or termEnums from other threads
-          if (v instanceof IteratorWriter) {
-            List lst = new ArrayList();
-            ((IteratorWriter)v).toList(lst);
-            v = lst;
-          } else if (v instanceof MapWriter) {
-            Map<String, Object> map = new HashMap<>();
-            ((MapWriter)v).toMap(map);
-            v = map;
-          }
-          fields[i + 1] = v;
-          putOk = true;
-          break;
-        }
-      }
-      if (!putOk) {
-        throw new IOException("should not happen! pos=" + pos + " ran out of space for field " + k + "=" + v
-            +  " - already full: " + Arrays.toString(fields));
-      }
-      return this;
-    }
-
-    // helper method to make it easier to write our internal key-value array as if it were a map
-    public void writeItem(int pos, IteratorWriter.ItemWriter itemWriter) throws IOException {
-      final Object[] fields = outDocs[pos];
-      if (fields == null) {
-        return;
-      }
-      itemWriter.add((MapWriter) ew -> {
-        for (int i = 0; i < fields.length; i += 2) {
-          if (fields[i] == null) {
-            continue;
-          }
-          ew.put((CharSequence)fields[i], fields[i + 1]);
-        }
-      });
-    }
-
-    public void reset() {
+    public void initialize(SortDoc proto) {
       outDocsIndex = EMPTY;
-      pos = EMPTY;
-      outDocs = null;
+      for (int i = 0; i < outDocs.length; i++) {
+        outDocs[i] = proto.copy();
+      }
     }
 
     @Override
diff --git a/solr/core/src/java/org/apache/solr/handler/export/ExportWriter.java b/solr/core/src/java/org/apache/solr/handler/export/ExportWriter.java
index 26bc841..7dbeac2 100644
--- a/solr/core/src/java/org/apache/solr/handler/export/ExportWriter.java
+++ b/solr/core/src/java/org/apache/solr/handler/export/ExportWriter.java
@@ -321,21 +321,25 @@ public class ExportWriter implements SolrCore.RawWriter, Closeable {
     }
   }
 
-  protected void transferBatchToBufferForOutput(SortQueue queue, SortDoc[] outDocs,
-                                                List<LeafReaderContext> leaves, ExportBuffers.Buffer destination) throws IOException {
+  protected void transferBatchToBufferForOutput(SortQueue queue,
+                                                List<LeafReaderContext> leaves,
+                                                ExportBuffers.Buffer destination) throws IOException {
     Timer.Context timerContext = transferBatchToBufferTimer.time();
     try {
       int outDocsIndex = -1;
       for (int i = 0; i < queue.maxSize; i++) {
         SortDoc s = queue.pop();
         if (s.docId > -1) {
-          outDocs[++outDocsIndex] = s;
+          destination.outDocs[++outDocsIndex].setValues(s);
           // remove this doc id from the matching bitset, it's been exported
           sets[s.ord].clear(s.docId);
+          s.reset(); // reuse
         }
       }
       destination.outDocsIndex = outDocsIndex;
-      materializeDocs(leaves, outDocs, destination);
+    } catch (Throwable t) {
+      log.error("transfer", t);
+      throw t;
     } finally {
       timerContext.stop();
     }
@@ -398,7 +402,8 @@ public class ExportWriter implements SolrCore.RawWriter, Closeable {
               // we're using the raw writer here because there's no potential
               // reduction in the number of output items, unlike when using
               // streaming expressions
-              buffer.writeItem(i, writer);
+              final SortDoc currentDoc = buffer.outDocs[i];
+              writer.add((MapWriter) ew -> writeDoc(currentDoc, leaves, ew, fieldWriters));
             }
           } finally {
             timerContext.stop();
@@ -419,38 +424,25 @@ public class ExportWriter implements SolrCore.RawWriter, Closeable {
   }
 
   void fillOutDocs(List<LeafReaderContext> leaves, SortDoc sortDoc,
-                          SortQueue sortQueue, SortDoc[] outDocs, ExportBuffers.Buffer buffer) throws IOException {
+                          SortQueue sortQueue, ExportBuffers.Buffer buffer) throws IOException {
     identifyLowestSortingUnexportedDocs(leaves, sortDoc, sortQueue);
-    transferBatchToBufferForOutput(sortQueue, outDocs, leaves, buffer);
-  }
-
-  private void materializeDocs(List<LeafReaderContext> leaves, SortDoc[] outDocs, ExportBuffers.Buffer buffer) throws IOException {
-    log.info("--- materialize docs in " + buffer);
-    if (buffer.outDocsIndex < 0) {
-      return;
-    }
-    for (int i = buffer.outDocsIndex; i >= 0; i--) {
-      SortDoc sortDoc = outDocs[i];
-      EntryWriter ew = buffer.getEntryWriter(i, fieldWriters.length);
-      writeDoc(sortDoc, leaves, ew);
-      sortDoc.reset();
-    }
+    transferBatchToBufferForOutput(sortQueue, leaves, buffer);
   }
 
   void writeDoc(SortDoc sortDoc,
                           List<LeafReaderContext> leaves,
-                          EntryWriter ew) throws IOException {
+                          EntryWriter ew, FieldWriter[] writers) throws IOException {
     int ord = sortDoc.ord;
     LeafReaderContext context = leaves.get(ord);
     int fieldIndex = 0;
-    for (FieldWriter fieldWriter : fieldWriters) {
+    for (FieldWriter fieldWriter : writers) {
       if (fieldWriter.write(sortDoc, context.reader(), ew, fieldIndex)) {
         ++fieldIndex;
       }
     }
   }
 
-  protected FieldWriter[] getFieldWriters(String[] fields, SolrIndexSearcher searcher) throws IOException {
+  public FieldWriter[] getFieldWriters(String[] fields, SolrIndexSearcher searcher) throws IOException {
     IndexSchema schema = searcher.getSchema();
     FieldWriter[] writers = new FieldWriter[fields.length];
     for (int i = 0; i < fields.length; i++) {
diff --git a/solr/core/src/java/org/apache/solr/handler/export/ExportWriterStream.java b/solr/core/src/java/org/apache/solr/handler/export/ExportWriterStream.java
index 1ff8c84..bbdce58 100644
--- a/solr/core/src/java/org/apache/solr/handler/export/ExportWriterStream.java
+++ b/solr/core/src/java/org/apache/solr/handler/export/ExportWriterStream.java
@@ -18,7 +18,10 @@ package org.apache.solr.handler.export;
 
 import java.io.IOException;
 import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.BrokenBarrierException;
 import java.util.concurrent.TimeoutException;
 
@@ -36,6 +39,8 @@ import org.apache.solr.client.solrj.io.stream.expr.StreamExplanation;
 import org.apache.solr.client.solrj.io.stream.expr.StreamExpression;
 import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionParameter;
 import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
+import org.apache.solr.common.IteratorWriter;
+import org.apache.solr.common.MapWriter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -47,6 +52,7 @@ import org.slf4j.LoggerFactory;
  */
 public class ExportWriterStream extends TupleStream implements Expressible {
   private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+  final TupleEntryWriter tupleEntryWriter = new TupleEntryWriter();
   StreamContext context;
   StreamComparator streamComparator;
   int pos = -1;
@@ -54,6 +60,25 @@ public class ExportWriterStream extends TupleStream implements Expressible {
   ExportBuffers.Buffer buffer;
   Timer.Context writeOutputTimerContext;
 
+  private static final class TupleEntryWriter implements EntryWriter {
+    Tuple tuple;
+
+    @Override
+    public EntryWriter put(CharSequence k, Object v) throws IOException {
+      if (v instanceof IteratorWriter) {
+        List lst = new ArrayList();
+        ((IteratorWriter)v).toList(lst);
+        v = lst;
+      } else if (v instanceof MapWriter) {
+        Map<String, Object> map = new HashMap<>();
+        ((MapWriter)v).toMap(map);
+        v = map;
+      }
+      tuple.put(k.toString(), v);
+      return this;
+    }
+  }
+
   public ExportWriterStream(StreamExpression expression, StreamFactory factory) throws IOException {
     streamComparator = parseComp(factory.getDefaultSort());
   }
@@ -188,9 +213,11 @@ public class ExportWriterStream extends TupleStream implements Expressible {
     if (writeOutputTimerContext == null) {
       writeOutputTimerContext = exportBuffers.getWriteOutputBufferTimer().time();
     }
-    res = new Tuple(buffer.outDocs[pos]);
+    SortDoc sortDoc = buffer.outDocs[pos];
+    tupleEntryWriter.tuple = new Tuple();
+    exportBuffers.exportWriter.writeDoc(sortDoc, exportBuffers.leaves, tupleEntryWriter, exportBuffers.exportWriter.fieldWriters);
     pos--;
-    return res;
+    return tupleEntryWriter.tuple;
   }
 
   @Override
diff --git a/solr/core/src/java/org/apache/solr/handler/export/LongValue.java b/solr/core/src/java/org/apache/solr/handler/export/LongValue.java
index dce365b..63794ad 100644
--- a/solr/core/src/java/org/apache/solr/handler/export/LongValue.java
+++ b/solr/core/src/java/org/apache/solr/handler/export/LongValue.java
@@ -25,10 +25,10 @@ import org.apache.lucene.index.NumericDocValues;
 
 public class LongValue implements SortValue {
 
+  final protected String field;
+  final protected LongComp comp;
   protected NumericDocValues vals;
-  protected String field;
   protected long currentValue;
-  protected LongComp comp;
   private int lastDocID;
   private boolean present;
 
diff --git a/solr/core/src/java/org/apache/solr/handler/export/StringFieldWriter.java b/solr/core/src/java/org/apache/solr/handler/export/StringFieldWriter.java
index b82c365..7fb24ae 100644
--- a/solr/core/src/java/org/apache/solr/handler/export/StringFieldWriter.java
+++ b/solr/core/src/java/org/apache/solr/handler/export/StringFieldWriter.java
@@ -73,9 +73,9 @@ class StringFieldWriter extends FieldWriter {
       ew.put(this.field, utf8.reset(ref.bytes, ref.offset, ref.length, null));
     } else {
       String v = null;
-      if(sortValue != null) {
+      if (sortValue != null) {
         v = ((StringValue) sortValue).getLastString();
-        if(v == null) {
+        if (v == null) {
           fieldType.indexedToReadable(ref, cref);
           v = cref.toString();
           ((StringValue) sortValue).setLastString(v);
diff --git a/solr/core/src/java/org/apache/solr/handler/export/StringValue.java b/solr/core/src/java/org/apache/solr/handler/export/StringValue.java
index fc70565..2ea25d5 100644
--- a/solr/core/src/java/org/apache/solr/handler/export/StringValue.java
+++ b/solr/core/src/java/org/apache/solr/handler/export/StringValue.java
@@ -31,13 +31,13 @@ class StringValue implements SortValue {
 
   protected SortedDocValues globalDocValues;
 
-  protected OrdinalMap ordinalMap;
+  final protected OrdinalMap ordinalMap;
   protected LongValues toGlobal = LongValues.IDENTITY; // this segment to global ordinal. NN;
   protected SortedDocValues docValues;
 
-  protected String field;
+  final protected String field;
   protected int currentOrd;
-  protected IntComp comp;
+  final protected IntComp comp;
   protected int lastDocID;
   private boolean present;
 
@@ -50,6 +50,8 @@ class StringValue implements SortValue {
     this.docValues = globalDocValues;
     if (globalDocValues instanceof MultiDocValues.MultiSortedDocValues) {
       this.ordinalMap = ((MultiDocValues.MultiSortedDocValues) globalDocValues).mapping;
+    } else {
+      this.ordinalMap = null;
     }
     this.field = field;
     this.comp = comp;
@@ -66,7 +68,8 @@ class StringValue implements SortValue {
   }
 
   public StringValue copy() {
-    return new StringValue(globalDocValues, field, comp);
+    StringValue copy = new StringValue(globalDocValues, field, comp);
+    return copy;
   }
 
   public void setCurrentValue(int docId) throws IOException {