You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ab...@apache.org on 2020/06/15 18:01:09 UTC

[lucene-solr] branch jira/solr-14537 updated: SOLR-14537: Replace Exchanger with CyclicBarrier for better exception control.

This is an automated email from the ASF dual-hosted git repository.

ab pushed a commit to branch jira/solr-14537
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git


The following commit(s) were added to refs/heads/jira/solr-14537 by this push:
     new 42ee2e3  SOLR-14537: Replace Exchanger with CyclicBarrier for better exception control.
42ee2e3 is described below

commit 42ee2e340b1e0646e40abe181b4f60be069a6de9
Author: Andrzej Bialecki <ab...@apache.org>
AuthorDate: Mon Jun 15 20:00:27 2020 +0200

    SOLR-14537: Replace Exchanger with CyclicBarrier for better exception control.
---
 .../apache/solr/handler/export/ExportBuffers.java  | 70 ++++++++++++++++------
 .../apache/solr/handler/export/ExportWriter.java   | 14 +++--
 .../solr/handler/export/ExportWriterStream.java    | 40 +++++++++++--
 .../solr/handler/export/TestExportWriter.java      | 15 ++++-
 4 files changed, 111 insertions(+), 28 deletions(-)

diff --git a/solr/core/src/java/org/apache/solr/handler/export/ExportBuffers.java b/solr/core/src/java/org/apache/solr/handler/export/ExportBuffers.java
index 5525f81..66d0018 100644
--- a/solr/core/src/java/org/apache/solr/handler/export/ExportBuffers.java
+++ b/solr/core/src/java/org/apache/solr/handler/export/ExportBuffers.java
@@ -26,10 +26,9 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Exchanger;
+import java.util.concurrent.CyclicBarrier;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.LongAdder;
 
 import org.apache.lucene.index.LeafReaderContext;
@@ -53,19 +52,21 @@ class ExportBuffers {
 
   final Buffer bufferOne;
   final Buffer bufferTwo;
-  final Exchanger<Buffer> exchanger = new Exchanger<>();
   final SortDoc[] outDocs;
   final List<LeafReaderContext> leaves;
   final ExportWriter exportWriter;
   final OutputStream os;
   final IteratorWriter.ItemWriter rawWriter;
   final IteratorWriter.ItemWriter writer;
+  final CyclicBarrier barrier;
   final int totalHits;
   Buffer fillBuffer;
   Buffer outputBuffer;
   Runnable filler;
   ExecutorService service;
+  Throwable error;
   LongAdder outputCounter = new LongAdder();
+  volatile boolean shutDown = false;
 
   ExportBuffers(ExportWriter exportWriter, List<LeafReaderContext> leaves, SolrIndexSearcher searcher,
                 OutputStream os, IteratorWriter.ItemWriter rawWriter, Sort sort, int queueSize, int totalHits) {
@@ -87,34 +88,62 @@ class ExportBuffers {
     this.totalHits = totalHits;
     fillBuffer = bufferOne;
     outputBuffer = bufferTwo;
+    barrier = new CyclicBarrier(2, () -> swapBuffers());
     filler = () -> {
       try {
+        log.info("--- filler start " + Thread.currentThread());
+        Buffer buffer = getFillBuffer();
         SortDoc sortDoc = exportWriter.getSortDoc(searcher, sort.getSort());
         SortQueue queue = new SortQueue(queueSize, sortDoc);
         long lastOutputCounter = 0;
         for (int count = 0; count < totalHits; ) {
           log.info("--- filler fillOutDocs in " + fillBuffer);
-          exportWriter.fillOutDocs(leaves, sortDoc, queue, outDocs, fillBuffer);
-          count += (fillBuffer.outDocsIndex + 1);
-          log.info("--- filler count=" + count + ", exchange buffer from " + fillBuffer);
-          fillBuffer = exchange(fillBuffer);
+          exportWriter.fillOutDocs(leaves, sortDoc, queue, outDocs, buffer);
+          count += (buffer.outDocsIndex + 1);
+          log.info("--- filler count=" + count + ", exchange buffer from " + buffer);
+          exchangeBuffers();
+          buffer = getFillBuffer();
           if (outputCounter.longValue() > lastOutputCounter) {
             lastOutputCounter = outputCounter.longValue();
             flushOutput();
           }
-          log.info("--- filler got empty buffer " + fillBuffer);
+          log.info("--- filler got empty buffer " + buffer);
         }
-        fillBuffer.outDocsIndex = Buffer.NO_MORE_DOCS;
-        log.info("--- filler final exchange buffer from " + fillBuffer);
-        fillBuffer = exchange(fillBuffer);
-        log.info("--- filler final got buffer " + fillBuffer);
+        buffer.outDocsIndex = Buffer.NO_MORE_DOCS;
+        log.info("--- filler final exchange buffer from " + buffer);
+        exchangeBuffers();
+        buffer = getFillBuffer();
+        log.info("--- filler final got buffer " + buffer);
       } catch (Exception e) {
         log.error("filler", e);
+        error(e);
         shutdownNow();
       }
     };
   }
 
+  public void exchangeBuffers() throws Exception {
+    log.info("---- wait from " + Thread.currentThread());
+    barrier.await(EXCHANGE_TIMEOUT_SECONDS, TimeUnit.SECONDS);
+  }
+
+  public void error(Throwable t) {
+    error = t;
+    // break the lock on the other thread too
+    barrier.reset();
+  }
+
+  public Throwable getError() {
+    return error;
+  }
+
+  private void swapBuffers() {
+    log.info("--- swap buffers");
+    Buffer one = fillBuffer;
+    fillBuffer = outputBuffer;
+    outputBuffer = one;
+  }
+
   private void flushOutput() throws IOException {
     os.flush();
   }
@@ -124,20 +153,26 @@ class ExportBuffers {
     return outputBuffer;
   }
 
+  public Buffer getFillBuffer() {
+    return fillBuffer;
+  }
+
   // decorated writer that keeps track of number of writes
   public IteratorWriter.ItemWriter getWriter() {
     return writer;
   }
 
-  public Buffer exchange(Buffer buffer) throws InterruptedException, TimeoutException {
-    return exchanger.exchange(buffer, EXCHANGE_TIMEOUT_SECONDS, TimeUnit.SECONDS);
-  }
-
   public void shutdownNow() {
     if (service != null) {
       log.info("--- shutting down buffers");
       service.shutdownNow();
+      service = null;
     }
+    shutDown = true;
+  }
+
+  public boolean isShutDown() {
+    return shutDown;
   }
 
   /**
@@ -168,10 +203,11 @@ class ExportBuffers {
       log.info("-- finished.");
     } catch (Exception e) {
       log.error("Exception running filler / writer", e);
+      error(e);
       //
     } finally {
       log.info("--- all done, shutting down buffers");
-      service.shutdownNow();
+      shutdownNow();
     }
   }
 
diff --git a/solr/core/src/java/org/apache/solr/handler/export/ExportWriter.java b/solr/core/src/java/org/apache/solr/handler/export/ExportWriter.java
index 78bf2ef..1d161f5 100644
--- a/solr/core/src/java/org/apache/solr/handler/export/ExportWriter.java
+++ b/solr/core/src/java/org/apache/solr/handler/export/ExportWriter.java
@@ -331,17 +331,20 @@ public class ExportWriter implements SolrCore.RawWriter, Closeable {
             t = tupleStream.read();
           } catch (final Exception e) {
             buffers.getWriter().add((MapWriter) ew -> Tuple.EXCEPTION(e, true).writeMap(ew));
-            throw e;
+            break;
           }
           if (t == null) {
             break;
           }
-          if (t.EOF) {
+          if (t.EOF && !t.EXCEPTION) {
             break;
           }
           // use decorated writer to monitor the number of output writes
           // and flush the output quickly in case of very few (reduced) output items
           buffers.getWriter().add((MapWriter) ew -> t.writeMap(ew));
+          if (t.EXCEPTION && t.EOF) {
+            break;
+          }
         }
         return true;
       });
@@ -349,9 +352,9 @@ public class ExportWriter implements SolrCore.RawWriter, Closeable {
     } else {
       buffers.run(() -> {
         // get the initial buffer
+        log.info("--- writer init exchanging from empty");
+        buffers.exchangeBuffers();
         ExportBuffers.Buffer buffer = buffers.getOutputBuffer();
-        log.info("--- writer init exchanging from " + buffer);
-        buffer = buffers.exchange(buffer);
         log.info("--- writer init got " + buffer);
         while (buffer.outDocsIndex != ExportBuffers.Buffer.NO_MORE_DOCS) {
           if (Thread.currentThread().isInterrupted()) {
@@ -365,7 +368,8 @@ public class ExportWriter implements SolrCore.RawWriter, Closeable {
             buffer.writeItem(i, writer);
           }
           log.info("--- writer exchanging from " + buffer);
-          buffer = buffers.exchange(buffer);
+          buffers.exchangeBuffers();
+          buffer = buffers.getOutputBuffer();
           log.info("--- writer got " + buffer);
         }
         return true;
diff --git a/solr/core/src/java/org/apache/solr/handler/export/ExportWriterStream.java b/solr/core/src/java/org/apache/solr/handler/export/ExportWriterStream.java
index 0a783a3..2e58304 100644
--- a/solr/core/src/java/org/apache/solr/handler/export/ExportWriterStream.java
+++ b/solr/core/src/java/org/apache/solr/handler/export/ExportWriterStream.java
@@ -19,6 +19,8 @@ package org.apache.solr.handler.export;
 import java.io.IOException;
 import java.lang.invoke.MethodHandles;
 import java.util.List;
+import java.util.concurrent.BrokenBarrierException;
+import java.util.concurrent.TimeoutException;
 
 import org.apache.solr.client.solrj.io.Tuple;
 import org.apache.solr.client.solrj.io.comp.ComparatorOrder;
@@ -111,12 +113,42 @@ public class ExportWriterStream extends TupleStream implements Expressible {
       try {
         buffer.outDocsIndex = ExportBuffers.Buffer.EMPTY;
         log.info("--- ews exchange empty buffer " + buffer);
-        buffer = exportBuffers.exchanger.exchange(buffer);
-        log.info("--- ews got new output buffer " + buffer);
+        boolean exchanged = false;
+        while (!exchanged) {
+          try {
+            exportBuffers.exchangeBuffers();
+            exchanged = true;
+          } catch (TimeoutException e) {
+            log.info("--- ews timeout loop");
+            if (exportBuffers.isShutDown()) {
+              log.info("--- ews - the other end is shutdown, returning EOF");
+              return Tuple.EOF();
+            }
+            continue;
+          } catch (InterruptedException e) {
+            log.info("--- ews interrupted");
+            exportBuffers.error(e);
+            return Tuple.EXCEPTION(e, true);
+          } catch (BrokenBarrierException e) {
+            if (exportBuffers.getError() != null) {
+              return Tuple.EXCEPTION(exportBuffers.getError(), true);
+            } else {
+              return Tuple.EXCEPTION(e, true);
+            }
+          }
+        }
       } catch (InterruptedException e) {
         log.info("--- ews interrupt");
-        Thread.currentThread().interrupt();
-        throw new IOException("interrupted");
+        exportBuffers.error(e);
+        return Tuple.EXCEPTION(e, true);
+      } catch (Exception e) {
+        log.info("--- ews exception", e);
+        exportBuffers.error(e);
+        return Tuple.EXCEPTION(e, true);
+      }
+      buffer = exportBuffers.getOutputBuffer();
+      if (buffer == null) {
+        return Tuple.EOF();
       }
       if (buffer.outDocsIndex == ExportBuffers.Buffer.NO_MORE_DOCS) {
         log.info("--- ews EOF");
diff --git a/solr/core/src/test/org/apache/solr/handler/export/TestExportWriter.java b/solr/core/src/test/org/apache/solr/handler/export/TestExportWriter.java
index 148acbd..a6c2319 100644
--- a/solr/core/src/test/org/apache/solr/handler/export/TestExportWriter.java
+++ b/solr/core/src/test/org/apache/solr/handler/export/TestExportWriter.java
@@ -32,6 +32,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.lucene.util.TestUtil;
 import org.apache.solr.SolrTestCaseJ4;
 import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.params.StreamParams;
 import org.apache.solr.common.util.SuppressForbidden;
 import org.apache.solr.common.util.Utils;
 import org.apache.solr.index.LogDocMergePolicyFactory;
@@ -709,8 +710,8 @@ public class TestExportWriter extends SolrTestCaseJ4 {
   }
 
   private void createLargeIndex() throws Exception {
-    int BATCH_SIZE = 1000;
-    int NUM_BATCHES = 100;
+    int BATCH_SIZE = 5000;
+    int NUM_BATCHES = 20;
     SolrInputDocument[] docs = new SolrInputDocument[BATCH_SIZE];
     for (int i = 0; i < NUM_BATCHES; i++) {
       for (int j = 0; j < BATCH_SIZE; j++) {
@@ -770,6 +771,16 @@ public class TestExportWriter extends SolrTestCaseJ4 {
       assertNotNull("missing count: " + doc, doc.get("count(*)"));
       assertEquals(1000.0, ((Number)doc.get("count(*)")).doubleValue(), 200.0);
     }
+    // try invalid field types
+    req = req("q", "*:*", "qt", "/export", "fl", "id,sortabledv,small_i_p", "sort", "sortabledv asc", "expr", "unique(input(),over=\"sortabledv\")");
+    rsp = h.query(req);
+    rspMap = mapper.readValue(rsp, HashMap.class);
+    assertEquals("wrong response status", 400, ((Number)Utils.getObjectByPath(rspMap, false, "/responseHeader/status")).intValue());
+    docs = (List<Map<String, Object>>) Utils.getObjectByPath(rspMap, false, "/response/docs");
+    assertEquals("wrong number of docs", 1, docs.size());
+    Map<String, Object> doc = docs.get(0);
+    assertTrue("doc doesn't have exception", doc.containsKey(StreamParams.EXCEPTION));
+    assertTrue("wrong exception message", doc.get(StreamParams.EXCEPTION).toString().contains("Must have useDocValuesAsStored='true'"));
   }
 
   private void validateSort(int numDocs) throws Exception {