You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ab...@apache.org on 2020/06/10 17:30:05 UTC

[lucene-solr] branch jira/solr-14537 created (now 2dba525)

This is an automated email from the ASF dual-hosted git repository.

ab pushed a change to branch jira/solr-14537
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git.


      at 2dba525  SOLR-14537: Use double buffering to increase throughput.

This branch includes the following new commits:

     new 2dba525  SOLR-14537: Use double buffering to increase throughput.

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[lucene-solr] 01/01: SOLR-14537: Use double buffering to increase throughput.

Posted by ab...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ab pushed a commit to branch jira/solr-14537
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git

commit 2dba525ae6b37e826eb4a08827ed0e109e88784b
Author: Andrzej Bialecki <ab...@apache.org>
AuthorDate: Wed Jun 10 19:29:28 2020 +0200

    SOLR-14537: Use double buffering to increase throughput.
---
 .../apache/solr/handler/export/ExportWriter.java   | 367 +++++++++++++++------
 .../solr/handler/export/SingleValueSortDoc.java    |   5 -
 .../org/apache/solr/handler/export/SortDoc.java    |   4 -
 .../org/apache/solr/client/solrj/io/Tuple.java     |   4 +
 4 files changed, 274 insertions(+), 106 deletions(-)

diff --git a/solr/core/src/java/org/apache/solr/handler/export/ExportWriter.java b/solr/core/src/java/org/apache/solr/handler/export/ExportWriter.java
index 3eccd0d..813db92 100644
--- a/solr/core/src/java/org/apache/solr/handler/export/ExportWriter.java
+++ b/solr/core/src/java/org/apache/solr/handler/export/ExportWriter.java
@@ -24,7 +24,17 @@ import java.io.OutputStreamWriter;
 import java.io.PrintWriter;
 import java.lang.invoke.MethodHandles;
 import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Exchanger;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
 import org.apache.lucene.index.LeafReader;
 import org.apache.lucene.index.LeafReaderContext;
@@ -58,7 +68,9 @@ import org.apache.solr.common.SolrException;
 import org.apache.solr.common.params.CommonParams;
 import org.apache.solr.common.params.SolrParams;
 import org.apache.solr.common.params.StreamParams;
+import org.apache.solr.common.util.ExecutorUtil;
 import org.apache.solr.common.util.JavaBinCodec;
+import org.apache.solr.common.util.SolrNamedThreadFactory;
 import org.apache.solr.core.SolrCore;
 import org.apache.solr.request.SolrQueryRequest;
 import org.apache.solr.request.SolrRequestInfo;
@@ -106,11 +118,8 @@ public class ExportWriter implements SolrCore.RawWriter, Closeable {
   private static final int DOCUMENT_BATCH_SIZE = 30000;
 
   private static final String EXPORT_WRITER_KEY = "__ew__";
-  private static final String SORT_DOCS_KEY = "_ew_docs_";
-  private static final String TOTAL_HITS_KEY = "_ew_totalHits_";
-  private static final String LEAF_READERS_KEY = "_ew_leaves_";
-  private static final String SORT_QUEUE_KEY = "_ew_queue_";
-  private static final String SORT_DOC_KEY = "_ew_sort_";
+  private static final String EXPORT_BUFFERS_KEY = "__eb__";
+  private static final String LEAF_READERS_KEY = "__leaves__";
 
   private OutputStreamWriter respWriter;
   final SolrQueryRequest req;
@@ -124,33 +133,12 @@ public class ExportWriter implements SolrCore.RawWriter, Closeable {
   PushWriter writer;
   private String wt;
 
-  private static class TupleEntryWriter implements EntryWriter {
-    Tuple tuple;
-
-    void setTuple(Tuple tuple) {
-      this.tuple = tuple;
-    }
-
-    @Override
-    public EntryWriter put(CharSequence k, Object v) throws IOException {
-      tuple.put(k, v);
-      return this;
-    }
-  }
-
   public static class ExportWriterStream extends TupleStream implements Expressible {
     StreamContext context;
     StreamComparator streamComparator;
     int pos = -1;
-    int outDocIndex = -1;
-    int count;
-    SortDoc sortDoc;
-    SortQueue queue;
-    SortDoc[] docs;
-    int totalHits;
-    ExportWriter exportWriter;
-    List<LeafReaderContext> leaves;
-    final TupleEntryWriter entryWriter = new TupleEntryWriter();
+    ExportBuffers exportBuffers;
+    Buffer buffer;
 
     public ExportWriterStream(StreamExpression expression, StreamFactory factory) throws IOException {
       streamComparator = parseComp(factory.getDefaultSort());
@@ -170,7 +158,7 @@ public class ExportWriter implements SolrCore.RawWriter, Closeable {
 
       String[] sorts = sort.split(",");
       StreamComparator[] comps = new StreamComparator[sorts.length];
-      for(int i=0; i<sorts.length; i++) {
+      for (int i = 0; i < sorts.length; i++) {
         String s = sorts[i];
 
         String[] spec = s.trim().split("\\s+"); //This should take into account spaces in the sort spec.
@@ -185,7 +173,7 @@ public class ExportWriter implements SolrCore.RawWriter, Closeable {
         comps[i] = new FieldComparator(fieldName, order.equalsIgnoreCase("asc") ? ComparatorOrder.ASCENDING : ComparatorOrder.DESCENDING);
       }
 
-      if(comps.length > 1) {
+      if (comps.length > 1) {
         return new MultipleFieldComparator(comps);
       } else {
         return comps[0];
@@ -194,40 +182,41 @@ public class ExportWriter implements SolrCore.RawWriter, Closeable {
 
     @Override
     public void open() throws IOException {
-      docs = (SortDoc[]) context.get(SORT_DOCS_KEY);
-      queue = (SortQueue) context.get(SORT_QUEUE_KEY);
-      sortDoc = (SortDoc) context.get(SORT_DOC_KEY);
-      totalHits = (Integer) context.get(TOTAL_HITS_KEY);
-      exportWriter = (ExportWriter) context.get(EXPORT_WRITER_KEY);
-      leaves = (List<LeafReaderContext>) context.get(LEAF_READERS_KEY);
-      count = 0;
+      exportBuffers = (ExportBuffers) context.get(EXPORT_BUFFERS_KEY);
+      buffer = exportBuffers.getOutputBuffer();
     }
 
     @Override
     public void close() throws IOException {
-      exportWriter = null;
-      leaves = null;
+      exportBuffers = null;
     }
 
     @Override
     public Tuple read() throws IOException {
       if (pos < 0) {
-        if (count < totalHits) {
-          outDocIndex = exportWriter.fillOutDocs(leaves, sortDoc, queue, docs);
-          count += (outDocIndex + 1);
-          pos = outDocIndex;
-        } else {
+        try {
+          buffer.outDocsIndex = Buffer.EMPTY;
+          log.info("--- ews exchange empty buffer " + buffer);
+          buffer = exportBuffers.exchanger.exchange(buffer);
+          log.info("--- ews got new output buffer " + buffer);
+        } catch (InterruptedException e) {
+          log.info("--- ews interrupt");
+          Thread.currentThread().interrupt();
+          throw new IOException("interrupted");
+        }
+        if (buffer.outDocsIndex == Buffer.NO_MORE_DOCS) {
+          log.info("--- ews EOF");
           return Tuple.EOF();
+        } else {
+          pos = buffer.outDocsIndex;
+          log.info("--- ews new pos=" + pos);
         }
       }
       if (pos < 0) {
+        log.info("--- ews EOF?");
         return Tuple.EOF();
       }
-      Tuple tuple = new Tuple();
-      entryWriter.setTuple(tuple);
-      SortDoc s = docs[pos];
-      exportWriter.writeDoc(s, leaves, entryWriter);
-      s.reset();
+      Tuple tuple = new Tuple(buffer.outDocs[pos]);
       pos--;
       return tuple;
     }
@@ -442,41 +431,197 @@ public class ExportWriter implements SolrCore.RawWriter, Closeable {
     }
   }
 
-  protected int transferBatchToArrayForOutput(SortQueue queue, SortDoc[] destinationArr) {
+  protected void transferBatchToBufferForOutput(SortQueue queue, SortDoc[] outDocs,
+                                                List<LeafReaderContext> leaves, Buffer destination) throws IOException {
     int outDocsIndex = -1;
     for (int i = 0; i < queue.maxSize; i++) {
       SortDoc s = queue.pop();
       if (s.docId > -1) {
-        destinationArr[++outDocsIndex] = s;
+        outDocs[++outDocsIndex] = s;
       }
     }
-
-    return outDocsIndex;
+    destination.outDocsIndex = outDocsIndex;
+    materializeDocs(leaves, outDocs, destination);
   }
 
-  protected void addDocsToItemWriter(List<LeafReaderContext> leaves, IteratorWriter.ItemWriter writer, SortDoc[] docsToExport, int outDocsIndex) throws IOException {
-    try {
-      for (int i = outDocsIndex; i >= 0; --i) {
-        SortDoc s = docsToExport[i];
-        writer.add((MapWriter) ew -> {
-          writeDoc(s, leaves, ew);
-          s.reset();
-        });
+  private static final class Buffer implements EntryWriter {
+    static final int EMPTY = -1;
+    static final int NO_MORE_DOCS = -2;
+
+    int outDocsIndex = EMPTY;
+    // use array-of-arrays instead of Map to conserve space
+    Object[][] outDocs;
+    int pos = EMPTY;
+
+    EntryWriter getEntryWriter(int pos, int numFields) {
+      if (outDocs == null) {
+        outDocs = new Object[outDocsIndex + 1][];
+      }
+      this.pos = pos;
+      Object[] fields = new Object[numFields << 1];
+      outDocs[pos] = fields;
+      return this;
+    }
+
+    @Override
+    public EntryWriter put(CharSequence k, Object v) throws IOException {
+      if (pos < 0) {
+        throw new IOException("Invalid entry position");
       }
-    } catch (Throwable e) {
-      Throwable ex = e;
-      while (ex != null) {
-        String m = ex.getMessage();
-        if (m != null && m.contains("Broken pipe")) {
-          throw new IgnoreException();
+      Object[] fields = outDocs[pos];
+      boolean putOk = false;
+      for (int i = 0; i < fields.length; i += 2) {
+        if (fields[i] == null || fields[i].equals(k)) {
+          fields[i] = k;
+          // convert everything complex into POJOs at this point
+          if (v instanceof IteratorWriter) {
+            List lst = new ArrayList();
+            ((IteratorWriter)v).toList(lst);
+            v = lst;
+          } else if (v instanceof MapWriter) {
+            Map<String, Object> map = new HashMap<>();
+            ((MapWriter)v).toMap(map);
+            v = map;
+          }
+          fields[i + 1] = v;
+          putOk = true;
+          break;
         }
-        ex = ex.getCause();
       }
+      if (!putOk) {
+        throw new IOException("should not happen! pos=" + pos + " ran out of space for field " + k + "=" + v
+            +  " - already full: " + Arrays.toString(fields));
+      }
+      return this;
+    }
 
-      if (e instanceof IOException) {
-        throw ((IOException) e);
-      } else {
-        throw new IOException(e);
+    public void writeItem(int pos, IteratorWriter.ItemWriter itemWriter) throws IOException {
+      final Object[] fields = outDocs[pos];
+      if (fields == null) {
+        log.info("--- writeItem no fields at " + pos);
+        return;
+      }
+      itemWriter.add((MapWriter) ew -> {
+        for (int i = 0; i < fields.length; i += 2) {
+          if (fields[i] == null) {
+            log.info("--- writeItem empty field at " + pos + "/" + i);
+            continue;
+          }
+          log.info("--- writeItem field " + pos + "/" + i);
+          ew.put((CharSequence)fields[i], fields[i + 1]);
+          log.info("--- writeItem done field " + pos + "/" + i);
+        }
+      });
+    }
+
+    public void reset() {
+      outDocsIndex = EMPTY;
+      pos = EMPTY;
+      outDocs = null;
+    }
+
+    @Override
+    public String toString() {
+      return "Buffer@" + Integer.toHexString(hashCode()) + "{" +
+          "outDocsIndex=" + outDocsIndex +
+          '}';
+    }
+  }
+
+  /* Helper class implementing a "double buffering" producer / consumer. */
+  private static final class ExportBuffers {
+    static final long EXCHANGE_TIMEOUT_SECONDS = 300;
+
+    final Buffer bufferOne;
+    final Buffer bufferTwo;
+    final Exchanger<Buffer> exchanger = new Exchanger<>();
+    final SortDoc[] outDocs;
+    final List<LeafReaderContext> leaves;
+    final ExportWriter exportWriter;
+    final int totalHits;
+    Buffer fillBuffer;
+    Buffer outputBuffer;
+    Runnable filler;
+    ExecutorService service;
+
+    ExportBuffers (ExportWriter exportWriter, List<LeafReaderContext> leaves, SolrIndexSearcher searcher, Sort sort, int queueSize, int totalHits) {
+      this.exportWriter = exportWriter;
+      this.leaves = leaves;
+      this.outDocs = new SortDoc[queueSize];
+      this.bufferOne = new Buffer();
+      this.bufferTwo = new Buffer();
+      this.totalHits = totalHits;
+      fillBuffer = bufferOne;
+      outputBuffer = bufferTwo;
+      filler = () -> {
+        try {
+          SortDoc sortDoc = exportWriter.getSortDoc(searcher, sort.getSort());
+          SortQueue queue = new SortQueue(queueSize, sortDoc);
+          for (int count = 0; count < totalHits; ) {
+            log.info("--- filler fillOutDocs in " + fillBuffer);
+            exportWriter.fillOutDocs(leaves, sortDoc, queue, outDocs, fillBuffer);
+            count += (fillBuffer.outDocsIndex + 1);
+            log.info("--- filler count=" + count + ", exchange buffer from " + fillBuffer);
+            fillBuffer = exchanger.exchange(fillBuffer, EXCHANGE_TIMEOUT_SECONDS, TimeUnit.SECONDS);
+            log.info("--- filler got empty buffer " + fillBuffer);
+          }
+          fillBuffer.outDocsIndex = Buffer.NO_MORE_DOCS;
+          log.info("--- filler final exchange buffer from " + fillBuffer);
+          fillBuffer = exchanger.exchange(fillBuffer, EXCHANGE_TIMEOUT_SECONDS, TimeUnit.SECONDS);
+          log.info("--- filler final got buffer " + fillBuffer);
+        } catch (Exception e) {
+          log.error("filler", e);
+          shutdownNow();
+        }
+      };
+    }
+
+    public Buffer getOutputBuffer() {
+      return outputBuffer;
+    }
+
+    public Buffer exchange(Buffer buffer) throws InterruptedException, TimeoutException {
+      return exchanger.exchange(buffer, EXCHANGE_TIMEOUT_SECONDS, TimeUnit.SECONDS);
+    }
+
+    public void shutdownNow() {
+      if (service != null) {
+        log.info("--- shutting down buffers");
+        service.shutdownNow();
+      }
+    }
+
+    /**
+     * Start processing and block until complete or Exception is thrown.
+     * @param writer writer that exchanges and processes buffers received from a producer.
+     * @throws IOException on errors
+     */
+    public void run(Callable<Boolean> writer) throws IOException {
+      service = ExecutorUtil.newMDCAwareFixedThreadPool(2, new SolrNamedThreadFactory("ExportBuffers"));
+      try {
+        CompletableFuture.runAsync(filler, service);
+        writer.call();
+
+        // alternatively we could run the writer in a separate thread too:
+//        CompletableFuture<Void> allDone = CompletableFuture.allOf(
+//            CompletableFuture.runAsync(filler, service),
+//            CompletableFuture.runAsync(() -> {
+//              try {
+//                writer.call();
+//              } catch (Exception e) {
+//                log.error("writer", e);
+//                shutdownNow();
+//              }
+//            }, service)
+//        );
+//        allDone.join();
+        log.info("-- finished.");
+      } catch (Exception e) {
+        log.error("Exception running filler / writer", e);
+         //
+      } finally {
+        log.info("--- all done, shutting down buffers");
+        service.shutdownNow();
       }
     }
   }
@@ -486,48 +631,76 @@ public class ExportWriter implements SolrCore.RawWriter, Closeable {
     SortDoc sortDoc = getSortDoc(req.getSearcher(), sort.getSort());
     final int queueSize = Math.min(DOCUMENT_BATCH_SIZE, totalHits);
 
-    SortQueue queue = new SortQueue(queueSize, sortDoc);
-    SortDoc[] outDocs = new SortDoc[queueSize];
+    ExportBuffers buffers = new ExportBuffers(this, leaves, req.getSearcher(), sort, queueSize, totalHits);
 
     if (streamExpression != null) {
-      streamContext.put(SORT_DOCS_KEY, outDocs);
-      streamContext.put(SORT_QUEUE_KEY, queue);
-      streamContext.put(SORT_DOC_KEY, sortDoc);
-      streamContext.put(TOTAL_HITS_KEY, totalHits);
       streamContext.put(EXPORT_WRITER_KEY, this);
+      streamContext.put(EXPORT_BUFFERS_KEY, buffers);
       streamContext.put(LEAF_READERS_KEY, leaves);
-      TupleStream tupleStream = createTupleStream();
+      final TupleStream tupleStream = createTupleStream();
       tupleStream.open();
-      for (;;) {
-        final Tuple t = tupleStream.read();
-        if (t == null) {
-          break;
-        }
-        if (t.EOF) {
-          break;
+      buffers.run(() -> {
+        for (;;) {
+          if (Thread.currentThread().isInterrupted()) {
+            break;
+          }
+          final Tuple t = tupleStream.read();
+          if (t == null) {
+            break;
+          }
+          if (t.EOF) {
+            break;
+          }
+          writer.add((MapWriter) ew -> t.writeMap(ew));
         }
-        writer.add((MapWriter) ew -> t.writeMap(ew));
-      }
+        return true;
+      });
       tupleStream.close();
     } else {
-      for (int count = 0; count < totalHits; ) {
-        int outDocsIndex = fillOutDocs(leaves, sortDoc, queue, outDocs);
-        count += (outDocsIndex + 1);
-        addDocsToItemWriter(leaves, writer, outDocs, outDocsIndex);
-      }
+      buffers.run(() -> {
+        Buffer buffer = buffers.getOutputBuffer();
+        log.info("--- writer init exchanging from " + buffer);
+        buffer = buffers.exchange(buffer);
+        log.info("--- writer init got " + buffer);
+        while (buffer.outDocsIndex != Buffer.NO_MORE_DOCS) {
+          if (Thread.currentThread().isInterrupted()) {
+            log.info("--- writer interrupted");
+            break;
+          }
+          for (int i = buffer.outDocsIndex; i >= 0; --i) {
+            buffer.writeItem(i, writer);
+          }
+          log.info("--- writer exchanging from " + buffer);
+          buffer = buffers.exchange(buffer);
+          log.info("--- writer got " + buffer);
+        }
+        return true;
+      });
     }
   }
 
-  private int fillOutDocs(List<LeafReaderContext> leaves, SortDoc sortDoc,
-                          SortQueue sortQueue, SortDoc[] outDocs) throws IOException {
+  private void fillOutDocs(List<LeafReaderContext> leaves, SortDoc sortDoc,
+                          SortQueue sortQueue, SortDoc[] outDocs, Buffer buffer) throws IOException {
     identifyLowestSortingUnexportedDocs(leaves, sortDoc, sortQueue);
-    return transferBatchToArrayForOutput(sortQueue, outDocs);
+    transferBatchToBufferForOutput(sortQueue, outDocs, leaves, buffer);
+  }
+
+  private void materializeDocs(List<LeafReaderContext> leaves, SortDoc[] outDocs, Buffer buffer) throws IOException {
+    log.info("--- materialize docs in " + buffer);
+    if (buffer.outDocsIndex < 0) {
+      return;
+    }
+    for (int i = buffer.outDocsIndex; i >= 0; i--) {
+      SortDoc sortDoc = outDocs[i];
+      EntryWriter ew = buffer.getEntryWriter(i, fieldWriters.length);
+      writeDoc(sortDoc, leaves, ew);
+      sortDoc.reset();
+    }
   }
 
   void writeDoc(SortDoc sortDoc,
                           List<LeafReaderContext> leaves,
                           EntryWriter ew) throws IOException {
-
     int ord = sortDoc.ord;
     FixedBitSet set = sets[ord];
     set.clear(sortDoc.docId);
diff --git a/solr/core/src/java/org/apache/solr/handler/export/SingleValueSortDoc.java b/solr/core/src/java/org/apache/solr/handler/export/SingleValueSortDoc.java
index 164c07b..963901c 100644
--- a/solr/core/src/java/org/apache/solr/handler/export/SingleValueSortDoc.java
+++ b/solr/core/src/java/org/apache/solr/handler/export/SingleValueSortDoc.java
@@ -32,11 +32,6 @@ class SingleValueSortDoc extends SortDoc {
     return null;
   }
 
-  @Override
-  public SortValue[] getSortValues() {
-    return new SortValue[] { value1 };
-  }
-
   public void setNextReader(LeafReaderContext context) throws IOException {
     this.ord = context.ord;
     this.docBase = context.docBase;
diff --git a/solr/core/src/java/org/apache/solr/handler/export/SortDoc.java b/solr/core/src/java/org/apache/solr/handler/export/SortDoc.java
index 292e795..5e2c75d 100644
--- a/solr/core/src/java/org/apache/solr/handler/export/SortDoc.java
+++ b/solr/core/src/java/org/apache/solr/handler/export/SortDoc.java
@@ -45,10 +45,6 @@ class SortDoc {
     return null;
   }
 
-  public SortValue[] getSortValues() {
-    return sortValues;
-  }
-
   public void setNextReader(LeafReaderContext context) throws IOException {
     this.ord = context.ord;
     this.docBase = context.docBase;
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/Tuple.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/Tuple.java
index 2bdb2aa..3b58e86 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/Tuple.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/Tuple.java
@@ -91,6 +91,10 @@ public class Tuple implements Cloneable, MapWriter {
       throw new RuntimeException("must have a matching number of key-value pairs");
     }
     for (int i = 0; i < fields.length; i += 2) {
+      // skip empty entries
+      if (fields[i] == null) {
+        continue;
+      }
       put(fields[i], fields[i + 1]);
     }
   }