You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ab...@apache.org on 2020/06/10 17:30:06 UTC

[lucene-solr] 01/01: SOLR-14537: Use double buffering to increase throughput.

This is an automated email from the ASF dual-hosted git repository.

ab pushed a commit to branch jira/solr-14537
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git

commit 2dba525ae6b37e826eb4a08827ed0e109e88784b
Author: Andrzej Bialecki <ab...@apache.org>
AuthorDate: Wed Jun 10 19:29:28 2020 +0200

    SOLR-14537: Use double buffering to increase throughput.
---
 .../apache/solr/handler/export/ExportWriter.java   | 367 +++++++++++++++------
 .../solr/handler/export/SingleValueSortDoc.java    |   5 -
 .../org/apache/solr/handler/export/SortDoc.java    |   4 -
 .../org/apache/solr/client/solrj/io/Tuple.java     |   4 +
 4 files changed, 274 insertions(+), 106 deletions(-)

diff --git a/solr/core/src/java/org/apache/solr/handler/export/ExportWriter.java b/solr/core/src/java/org/apache/solr/handler/export/ExportWriter.java
index 3eccd0d..813db92 100644
--- a/solr/core/src/java/org/apache/solr/handler/export/ExportWriter.java
+++ b/solr/core/src/java/org/apache/solr/handler/export/ExportWriter.java
@@ -24,7 +24,17 @@ import java.io.OutputStreamWriter;
 import java.io.PrintWriter;
 import java.lang.invoke.MethodHandles;
 import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Exchanger;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
 import org.apache.lucene.index.LeafReader;
 import org.apache.lucene.index.LeafReaderContext;
@@ -58,7 +68,9 @@ import org.apache.solr.common.SolrException;
 import org.apache.solr.common.params.CommonParams;
 import org.apache.solr.common.params.SolrParams;
 import org.apache.solr.common.params.StreamParams;
+import org.apache.solr.common.util.ExecutorUtil;
 import org.apache.solr.common.util.JavaBinCodec;
+import org.apache.solr.common.util.SolrNamedThreadFactory;
 import org.apache.solr.core.SolrCore;
 import org.apache.solr.request.SolrQueryRequest;
 import org.apache.solr.request.SolrRequestInfo;
@@ -106,11 +118,8 @@ public class ExportWriter implements SolrCore.RawWriter, Closeable {
   private static final int DOCUMENT_BATCH_SIZE = 30000;
 
   private static final String EXPORT_WRITER_KEY = "__ew__";
-  private static final String SORT_DOCS_KEY = "_ew_docs_";
-  private static final String TOTAL_HITS_KEY = "_ew_totalHits_";
-  private static final String LEAF_READERS_KEY = "_ew_leaves_";
-  private static final String SORT_QUEUE_KEY = "_ew_queue_";
-  private static final String SORT_DOC_KEY = "_ew_sort_";
+  private static final String EXPORT_BUFFERS_KEY = "__eb__";
+  private static final String LEAF_READERS_KEY = "__leaves__";
 
   private OutputStreamWriter respWriter;
   final SolrQueryRequest req;
@@ -124,33 +133,12 @@ public class ExportWriter implements SolrCore.RawWriter, Closeable {
   PushWriter writer;
   private String wt;
 
-  private static class TupleEntryWriter implements EntryWriter {
-    Tuple tuple;
-
-    void setTuple(Tuple tuple) {
-      this.tuple = tuple;
-    }
-
-    @Override
-    public EntryWriter put(CharSequence k, Object v) throws IOException {
-      tuple.put(k, v);
-      return this;
-    }
-  }
-
   public static class ExportWriterStream extends TupleStream implements Expressible {
     StreamContext context;
     StreamComparator streamComparator;
     int pos = -1;
-    int outDocIndex = -1;
-    int count;
-    SortDoc sortDoc;
-    SortQueue queue;
-    SortDoc[] docs;
-    int totalHits;
-    ExportWriter exportWriter;
-    List<LeafReaderContext> leaves;
-    final TupleEntryWriter entryWriter = new TupleEntryWriter();
+    ExportBuffers exportBuffers;
+    Buffer buffer;
 
     public ExportWriterStream(StreamExpression expression, StreamFactory factory) throws IOException {
       streamComparator = parseComp(factory.getDefaultSort());
@@ -170,7 +158,7 @@ public class ExportWriter implements SolrCore.RawWriter, Closeable {
 
       String[] sorts = sort.split(",");
       StreamComparator[] comps = new StreamComparator[sorts.length];
-      for(int i=0; i<sorts.length; i++) {
+      for (int i = 0; i < sorts.length; i++) {
         String s = sorts[i];
 
         String[] spec = s.trim().split("\\s+"); //This should take into account spaces in the sort spec.
@@ -185,7 +173,7 @@ public class ExportWriter implements SolrCore.RawWriter, Closeable {
         comps[i] = new FieldComparator(fieldName, order.equalsIgnoreCase("asc") ? ComparatorOrder.ASCENDING : ComparatorOrder.DESCENDING);
       }
 
-      if(comps.length > 1) {
+      if (comps.length > 1) {
         return new MultipleFieldComparator(comps);
       } else {
         return comps[0];
@@ -194,40 +182,41 @@ public class ExportWriter implements SolrCore.RawWriter, Closeable {
 
     @Override
     public void open() throws IOException {
-      docs = (SortDoc[]) context.get(SORT_DOCS_KEY);
-      queue = (SortQueue) context.get(SORT_QUEUE_KEY);
-      sortDoc = (SortDoc) context.get(SORT_DOC_KEY);
-      totalHits = (Integer) context.get(TOTAL_HITS_KEY);
-      exportWriter = (ExportWriter) context.get(EXPORT_WRITER_KEY);
-      leaves = (List<LeafReaderContext>) context.get(LEAF_READERS_KEY);
-      count = 0;
+      exportBuffers = (ExportBuffers) context.get(EXPORT_BUFFERS_KEY);
+      buffer = exportBuffers.getOutputBuffer();
     }
 
     @Override
     public void close() throws IOException {
-      exportWriter = null;
-      leaves = null;
+      exportBuffers = null;
     }
 
     @Override
     public Tuple read() throws IOException {
       if (pos < 0) {
-        if (count < totalHits) {
-          outDocIndex = exportWriter.fillOutDocs(leaves, sortDoc, queue, docs);
-          count += (outDocIndex + 1);
-          pos = outDocIndex;
-        } else {
+        try {
+          buffer.outDocsIndex = Buffer.EMPTY;
+          log.info("--- ews exchange empty buffer " + buffer);
+          buffer = exportBuffers.exchanger.exchange(buffer);
+          log.info("--- ews got new output buffer " + buffer);
+        } catch (InterruptedException e) {
+          log.info("--- ews interrupt");
+          Thread.currentThread().interrupt();
+          throw new IOException("interrupted");
+        }
+        if (buffer.outDocsIndex == Buffer.NO_MORE_DOCS) {
+          log.info("--- ews EOF");
           return Tuple.EOF();
+        } else {
+          pos = buffer.outDocsIndex;
+          log.info("--- ews new pos=" + pos);
         }
       }
       if (pos < 0) {
+        log.info("--- ews EOF?");
         return Tuple.EOF();
       }
-      Tuple tuple = new Tuple();
-      entryWriter.setTuple(tuple);
-      SortDoc s = docs[pos];
-      exportWriter.writeDoc(s, leaves, entryWriter);
-      s.reset();
+      Tuple tuple = new Tuple(buffer.outDocs[pos]);
       pos--;
       return tuple;
     }
@@ -442,41 +431,197 @@ public class ExportWriter implements SolrCore.RawWriter, Closeable {
     }
   }
 
-  protected int transferBatchToArrayForOutput(SortQueue queue, SortDoc[] destinationArr) {
+  protected void transferBatchToBufferForOutput(SortQueue queue, SortDoc[] outDocs,
+                                                List<LeafReaderContext> leaves, Buffer destination) throws IOException {
     int outDocsIndex = -1;
     for (int i = 0; i < queue.maxSize; i++) {
       SortDoc s = queue.pop();
       if (s.docId > -1) {
-        destinationArr[++outDocsIndex] = s;
+        outDocs[++outDocsIndex] = s;
       }
     }
-
-    return outDocsIndex;
+    destination.outDocsIndex = outDocsIndex;
+    materializeDocs(leaves, outDocs, destination);
   }
 
-  protected void addDocsToItemWriter(List<LeafReaderContext> leaves, IteratorWriter.ItemWriter writer, SortDoc[] docsToExport, int outDocsIndex) throws IOException {
-    try {
-      for (int i = outDocsIndex; i >= 0; --i) {
-        SortDoc s = docsToExport[i];
-        writer.add((MapWriter) ew -> {
-          writeDoc(s, leaves, ew);
-          s.reset();
-        });
+  private static final class Buffer implements EntryWriter {
+    static final int EMPTY = -1;
+    static final int NO_MORE_DOCS = -2;
+
+    int outDocsIndex = EMPTY;
+    // use array-of-arrays instead of Map to conserve space
+    Object[][] outDocs;
+    int pos = EMPTY;
+
+    EntryWriter getEntryWriter(int pos, int numFields) {
+      if (outDocs == null) {
+        outDocs = new Object[outDocsIndex + 1][];
+      }
+      this.pos = pos;
+      Object[] fields = new Object[numFields << 1];
+      outDocs[pos] = fields;
+      return this;
+    }
+
+    @Override
+    public EntryWriter put(CharSequence k, Object v) throws IOException {
+      if (pos < 0) {
+        throw new IOException("Invalid entry position");
       }
-    } catch (Throwable e) {
-      Throwable ex = e;
-      while (ex != null) {
-        String m = ex.getMessage();
-        if (m != null && m.contains("Broken pipe")) {
-          throw new IgnoreException();
+      Object[] fields = outDocs[pos];
+      boolean putOk = false;
+      for (int i = 0; i < fields.length; i += 2) {
+        if (fields[i] == null || fields[i].equals(k)) {
+          fields[i] = k;
+          // convert everything complex into POJOs at this point
+          if (v instanceof IteratorWriter) {
+            List lst = new ArrayList();
+            ((IteratorWriter)v).toList(lst);
+            v = lst;
+          } else if (v instanceof MapWriter) {
+            Map<String, Object> map = new HashMap<>();
+            ((MapWriter)v).toMap(map);
+            v = map;
+          }
+          fields[i + 1] = v;
+          putOk = true;
+          break;
         }
-        ex = ex.getCause();
       }
+      if (!putOk) {
+        throw new IOException("should not happen! pos=" + pos + " ran out of space for field " + k + "=" + v
+            +  " - already full: " + Arrays.toString(fields));
+      }
+      return this;
+    }
 
-      if (e instanceof IOException) {
-        throw ((IOException) e);
-      } else {
-        throw new IOException(e);
+    public void writeItem(int pos, IteratorWriter.ItemWriter itemWriter) throws IOException {
+      final Object[] fields = outDocs[pos];
+      if (fields == null) {
+        log.info("--- writeItem no fields at " + pos);
+        return;
+      }
+      itemWriter.add((MapWriter) ew -> {
+        for (int i = 0; i < fields.length; i += 2) {
+          if (fields[i] == null) {
+            log.info("--- writeItem empty field at " + pos + "/" + i);
+            continue;
+          }
+          log.info("--- writeItem field " + pos + "/" + i);
+          ew.put((CharSequence)fields[i], fields[i + 1]);
+          log.info("--- writeItem done field " + pos + "/" + i);
+        }
+      });
+    }
+
+    public void reset() {
+      outDocsIndex = EMPTY;
+      pos = EMPTY;
+      outDocs = null;
+    }
+
+    @Override
+    public String toString() {
+      return "Buffer@" + Integer.toHexString(hashCode()) + "{" +
+          "outDocsIndex=" + outDocsIndex +
+          '}';
+    }
+  }
+
+  /* Helper class implementing a "double buffering" producer / consumer. */
+  private static final class ExportBuffers {
+    static final long EXCHANGE_TIMEOUT_SECONDS = 300;
+
+    final Buffer bufferOne;
+    final Buffer bufferTwo;
+    final Exchanger<Buffer> exchanger = new Exchanger<>();
+    final SortDoc[] outDocs;
+    final List<LeafReaderContext> leaves;
+    final ExportWriter exportWriter;
+    final int totalHits;
+    Buffer fillBuffer;
+    Buffer outputBuffer;
+    Runnable filler;
+    ExecutorService service;
+
+    ExportBuffers (ExportWriter exportWriter, List<LeafReaderContext> leaves, SolrIndexSearcher searcher, Sort sort, int queueSize, int totalHits) {
+      this.exportWriter = exportWriter;
+      this.leaves = leaves;
+      this.outDocs = new SortDoc[queueSize];
+      this.bufferOne = new Buffer();
+      this.bufferTwo = new Buffer();
+      this.totalHits = totalHits;
+      fillBuffer = bufferOne;
+      outputBuffer = bufferTwo;
+      filler = () -> {
+        try {
+          SortDoc sortDoc = exportWriter.getSortDoc(searcher, sort.getSort());
+          SortQueue queue = new SortQueue(queueSize, sortDoc);
+          for (int count = 0; count < totalHits; ) {
+            log.info("--- filler fillOutDocs in " + fillBuffer);
+            exportWriter.fillOutDocs(leaves, sortDoc, queue, outDocs, fillBuffer);
+            count += (fillBuffer.outDocsIndex + 1);
+            log.info("--- filler count=" + count + ", exchange buffer from " + fillBuffer);
+            fillBuffer = exchanger.exchange(fillBuffer, EXCHANGE_TIMEOUT_SECONDS, TimeUnit.SECONDS);
+            log.info("--- filler got empty buffer " + fillBuffer);
+          }
+          fillBuffer.outDocsIndex = Buffer.NO_MORE_DOCS;
+          log.info("--- filler final exchange buffer from " + fillBuffer);
+          fillBuffer = exchanger.exchange(fillBuffer, EXCHANGE_TIMEOUT_SECONDS, TimeUnit.SECONDS);
+          log.info("--- filler final got buffer " + fillBuffer);
+        } catch (Exception e) {
+          log.error("filler", e);
+          shutdownNow();
+        }
+      };
+    }
+
+    public Buffer getOutputBuffer() {
+      return outputBuffer;
+    }
+
+    public Buffer exchange(Buffer buffer) throws InterruptedException, TimeoutException {
+      return exchanger.exchange(buffer, EXCHANGE_TIMEOUT_SECONDS, TimeUnit.SECONDS);
+    }
+
+    public void shutdownNow() {
+      if (service != null) {
+        log.info("--- shutting down buffers");
+        service.shutdownNow();
+      }
+    }
+
+    /**
+     * Start processing and block until complete or Exception is thrown.
+     * @param writer writer that exchanges and processes buffers received from a producer.
+     * @throws IOException on errors
+     */
+    public void run(Callable<Boolean> writer) throws IOException {
+      service = ExecutorUtil.newMDCAwareFixedThreadPool(2, new SolrNamedThreadFactory("ExportBuffers"));
+      try {
+        CompletableFuture.runAsync(filler, service);
+        writer.call();
+
+        // alternatively we could run the writer in a separate thread too:
+//        CompletableFuture<Void> allDone = CompletableFuture.allOf(
+//            CompletableFuture.runAsync(filler, service),
+//            CompletableFuture.runAsync(() -> {
+//              try {
+//                writer.call();
+//              } catch (Exception e) {
+//                log.error("writer", e);
+//                shutdownNow();
+//              }
+//            }, service)
+//        );
+//        allDone.join();
+        log.info("-- finished.");
+      } catch (Exception e) {
+        log.error("Exception running filler / writer", e);
+         //
+      } finally {
+        log.info("--- all done, shutting down buffers");
+        service.shutdownNow();
       }
     }
   }
@@ -486,48 +631,76 @@ public class ExportWriter implements SolrCore.RawWriter, Closeable {
     SortDoc sortDoc = getSortDoc(req.getSearcher(), sort.getSort());
     final int queueSize = Math.min(DOCUMENT_BATCH_SIZE, totalHits);
 
-    SortQueue queue = new SortQueue(queueSize, sortDoc);
-    SortDoc[] outDocs = new SortDoc[queueSize];
+    ExportBuffers buffers = new ExportBuffers(this, leaves, req.getSearcher(), sort, queueSize, totalHits);
 
     if (streamExpression != null) {
-      streamContext.put(SORT_DOCS_KEY, outDocs);
-      streamContext.put(SORT_QUEUE_KEY, queue);
-      streamContext.put(SORT_DOC_KEY, sortDoc);
-      streamContext.put(TOTAL_HITS_KEY, totalHits);
       streamContext.put(EXPORT_WRITER_KEY, this);
+      streamContext.put(EXPORT_BUFFERS_KEY, buffers);
       streamContext.put(LEAF_READERS_KEY, leaves);
-      TupleStream tupleStream = createTupleStream();
+      final TupleStream tupleStream = createTupleStream();
       tupleStream.open();
-      for (;;) {
-        final Tuple t = tupleStream.read();
-        if (t == null) {
-          break;
-        }
-        if (t.EOF) {
-          break;
+      buffers.run(() -> {
+        for (;;) {
+          if (Thread.currentThread().isInterrupted()) {
+            break;
+          }
+          final Tuple t = tupleStream.read();
+          if (t == null) {
+            break;
+          }
+          if (t.EOF) {
+            break;
+          }
+          writer.add((MapWriter) ew -> t.writeMap(ew));
         }
-        writer.add((MapWriter) ew -> t.writeMap(ew));
-      }
+        return true;
+      });
       tupleStream.close();
     } else {
-      for (int count = 0; count < totalHits; ) {
-        int outDocsIndex = fillOutDocs(leaves, sortDoc, queue, outDocs);
-        count += (outDocsIndex + 1);
-        addDocsToItemWriter(leaves, writer, outDocs, outDocsIndex);
-      }
+      buffers.run(() -> {
+        Buffer buffer = buffers.getOutputBuffer();
+        log.info("--- writer init exchanging from " + buffer);
+        buffer = buffers.exchange(buffer);
+        log.info("--- writer init got " + buffer);
+        while (buffer.outDocsIndex != Buffer.NO_MORE_DOCS) {
+          if (Thread.currentThread().isInterrupted()) {
+            log.info("--- writer interrupted");
+            break;
+          }
+          for (int i = buffer.outDocsIndex; i >= 0; --i) {
+            buffer.writeItem(i, writer);
+          }
+          log.info("--- writer exchanging from " + buffer);
+          buffer = buffers.exchange(buffer);
+          log.info("--- writer got " + buffer);
+        }
+        return true;
+      });
     }
   }
 
-  private int fillOutDocs(List<LeafReaderContext> leaves, SortDoc sortDoc,
-                          SortQueue sortQueue, SortDoc[] outDocs) throws IOException {
+  private void fillOutDocs(List<LeafReaderContext> leaves, SortDoc sortDoc,
+                          SortQueue sortQueue, SortDoc[] outDocs, Buffer buffer) throws IOException {
     identifyLowestSortingUnexportedDocs(leaves, sortDoc, sortQueue);
-    return transferBatchToArrayForOutput(sortQueue, outDocs);
+    transferBatchToBufferForOutput(sortQueue, outDocs, leaves, buffer);
+  }
+
+  private void materializeDocs(List<LeafReaderContext> leaves, SortDoc[] outDocs, Buffer buffer) throws IOException {
+    log.info("--- materialize docs in " + buffer);
+    if (buffer.outDocsIndex < 0) {
+      return;
+    }
+    for (int i = buffer.outDocsIndex; i >= 0; i--) {
+      SortDoc sortDoc = outDocs[i];
+      EntryWriter ew = buffer.getEntryWriter(i, fieldWriters.length);
+      writeDoc(sortDoc, leaves, ew);
+      sortDoc.reset();
+    }
   }
 
   void writeDoc(SortDoc sortDoc,
                           List<LeafReaderContext> leaves,
                           EntryWriter ew) throws IOException {
-
     int ord = sortDoc.ord;
     FixedBitSet set = sets[ord];
     set.clear(sortDoc.docId);
diff --git a/solr/core/src/java/org/apache/solr/handler/export/SingleValueSortDoc.java b/solr/core/src/java/org/apache/solr/handler/export/SingleValueSortDoc.java
index 164c07b..963901c 100644
--- a/solr/core/src/java/org/apache/solr/handler/export/SingleValueSortDoc.java
+++ b/solr/core/src/java/org/apache/solr/handler/export/SingleValueSortDoc.java
@@ -32,11 +32,6 @@ class SingleValueSortDoc extends SortDoc {
     return null;
   }
 
-  @Override
-  public SortValue[] getSortValues() {
-    return new SortValue[] { value1 };
-  }
-
   public void setNextReader(LeafReaderContext context) throws IOException {
     this.ord = context.ord;
     this.docBase = context.docBase;
diff --git a/solr/core/src/java/org/apache/solr/handler/export/SortDoc.java b/solr/core/src/java/org/apache/solr/handler/export/SortDoc.java
index 292e795..5e2c75d 100644
--- a/solr/core/src/java/org/apache/solr/handler/export/SortDoc.java
+++ b/solr/core/src/java/org/apache/solr/handler/export/SortDoc.java
@@ -45,10 +45,6 @@ class SortDoc {
     return null;
   }
 
-  public SortValue[] getSortValues() {
-    return sortValues;
-  }
-
   public void setNextReader(LeafReaderContext context) throws IOException {
     this.ord = context.ord;
     this.docBase = context.docBase;
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/Tuple.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/Tuple.java
index 2bdb2aa..3b58e86 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/Tuple.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/Tuple.java
@@ -91,6 +91,10 @@ public class Tuple implements Cloneable, MapWriter {
       throw new RuntimeException("must have a matching number of key-value pairs");
     }
     for (int i = 0; i < fields.length; i += 2) {
+      // skip empty entries
+      if (fields[i] == null) {
+        continue;
+      }
       put(fields[i], fields[i + 1]);
     }
   }