You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ab...@apache.org on 2020/06/15 18:01:09 UTC
[lucene-solr] branch jira/solr-14537 updated: SOLR-14537: Replace
Exchanger with CyclicBarrier for better exception control.
This is an automated email from the ASF dual-hosted git repository.
ab pushed a commit to branch jira/solr-14537
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git
The following commit(s) were added to refs/heads/jira/solr-14537 by this push:
new 42ee2e3 SOLR-14537: Replace Exchanger with CyclicBarrier for better exception control.
42ee2e3 is described below
commit 42ee2e340b1e0646e40abe181b4f60be069a6de9
Author: Andrzej Bialecki <ab...@apache.org>
AuthorDate: Mon Jun 15 20:00:27 2020 +0200
SOLR-14537: Replace Exchanger with CyclicBarrier for better exception control.
---
.../apache/solr/handler/export/ExportBuffers.java | 70 ++++++++++++++++------
.../apache/solr/handler/export/ExportWriter.java | 14 +++--
.../solr/handler/export/ExportWriterStream.java | 40 +++++++++++--
.../solr/handler/export/TestExportWriter.java | 15 ++++-
4 files changed, 111 insertions(+), 28 deletions(-)
diff --git a/solr/core/src/java/org/apache/solr/handler/export/ExportBuffers.java b/solr/core/src/java/org/apache/solr/handler/export/ExportBuffers.java
index 5525f81..66d0018 100644
--- a/solr/core/src/java/org/apache/solr/handler/export/ExportBuffers.java
+++ b/solr/core/src/java/org/apache/solr/handler/export/ExportBuffers.java
@@ -26,10 +26,9 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Exchanger;
+import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.LongAdder;
import org.apache.lucene.index.LeafReaderContext;
@@ -53,19 +52,21 @@ class ExportBuffers {
final Buffer bufferOne;
final Buffer bufferTwo;
- final Exchanger<Buffer> exchanger = new Exchanger<>();
final SortDoc[] outDocs;
final List<LeafReaderContext> leaves;
final ExportWriter exportWriter;
final OutputStream os;
final IteratorWriter.ItemWriter rawWriter;
final IteratorWriter.ItemWriter writer;
+ final CyclicBarrier barrier;
final int totalHits;
Buffer fillBuffer;
Buffer outputBuffer;
Runnable filler;
ExecutorService service;
+ Throwable error;
LongAdder outputCounter = new LongAdder();
+ volatile boolean shutDown = false;
ExportBuffers(ExportWriter exportWriter, List<LeafReaderContext> leaves, SolrIndexSearcher searcher,
OutputStream os, IteratorWriter.ItemWriter rawWriter, Sort sort, int queueSize, int totalHits) {
@@ -87,34 +88,62 @@ class ExportBuffers {
this.totalHits = totalHits;
fillBuffer = bufferOne;
outputBuffer = bufferTwo;
+ barrier = new CyclicBarrier(2, () -> swapBuffers());
filler = () -> {
try {
+ log.info("--- filler start " + Thread.currentThread());
+ Buffer buffer = getFillBuffer();
SortDoc sortDoc = exportWriter.getSortDoc(searcher, sort.getSort());
SortQueue queue = new SortQueue(queueSize, sortDoc);
long lastOutputCounter = 0;
for (int count = 0; count < totalHits; ) {
log.info("--- filler fillOutDocs in " + fillBuffer);
- exportWriter.fillOutDocs(leaves, sortDoc, queue, outDocs, fillBuffer);
- count += (fillBuffer.outDocsIndex + 1);
- log.info("--- filler count=" + count + ", exchange buffer from " + fillBuffer);
- fillBuffer = exchange(fillBuffer);
+ exportWriter.fillOutDocs(leaves, sortDoc, queue, outDocs, buffer);
+ count += (buffer.outDocsIndex + 1);
+ log.info("--- filler count=" + count + ", exchange buffer from " + buffer);
+ exchangeBuffers();
+ buffer = getFillBuffer();
if (outputCounter.longValue() > lastOutputCounter) {
lastOutputCounter = outputCounter.longValue();
flushOutput();
}
- log.info("--- filler got empty buffer " + fillBuffer);
+ log.info("--- filler got empty buffer " + buffer);
}
- fillBuffer.outDocsIndex = Buffer.NO_MORE_DOCS;
- log.info("--- filler final exchange buffer from " + fillBuffer);
- fillBuffer = exchange(fillBuffer);
- log.info("--- filler final got buffer " + fillBuffer);
+ buffer.outDocsIndex = Buffer.NO_MORE_DOCS;
+ log.info("--- filler final exchange buffer from " + buffer);
+ exchangeBuffers();
+ buffer = getFillBuffer();
+ log.info("--- filler final got buffer " + buffer);
} catch (Exception e) {
log.error("filler", e);
+ error(e);
shutdownNow();
}
};
}
+ public void exchangeBuffers() throws Exception {
+ log.info("---- wait from " + Thread.currentThread());
+ barrier.await(EXCHANGE_TIMEOUT_SECONDS, TimeUnit.SECONDS);
+ }
+
+ public void error(Throwable t) {
+ error = t;
+ // break the lock on the other thread too
+ barrier.reset();
+ }
+
+ public Throwable getError() {
+ return error;
+ }
+
+ private void swapBuffers() {
+ log.info("--- swap buffers");
+ Buffer one = fillBuffer;
+ fillBuffer = outputBuffer;
+ outputBuffer = one;
+ }
+
private void flushOutput() throws IOException {
os.flush();
}
@@ -124,20 +153,26 @@ class ExportBuffers {
return outputBuffer;
}
+ public Buffer getFillBuffer() {
+ return fillBuffer;
+ }
+
// decorated writer that keeps track of number of writes
public IteratorWriter.ItemWriter getWriter() {
return writer;
}
- public Buffer exchange(Buffer buffer) throws InterruptedException, TimeoutException {
- return exchanger.exchange(buffer, EXCHANGE_TIMEOUT_SECONDS, TimeUnit.SECONDS);
- }
-
public void shutdownNow() {
if (service != null) {
log.info("--- shutting down buffers");
service.shutdownNow();
+ service = null;
}
+ shutDown = true;
+ }
+
+ public boolean isShutDown() {
+ return shutDown;
}
/**
@@ -168,10 +203,11 @@ class ExportBuffers {
log.info("-- finished.");
} catch (Exception e) {
log.error("Exception running filler / writer", e);
+ error(e);
//
} finally {
log.info("--- all done, shutting down buffers");
- service.shutdownNow();
+ shutdownNow();
}
}
diff --git a/solr/core/src/java/org/apache/solr/handler/export/ExportWriter.java b/solr/core/src/java/org/apache/solr/handler/export/ExportWriter.java
index 78bf2ef..1d161f5 100644
--- a/solr/core/src/java/org/apache/solr/handler/export/ExportWriter.java
+++ b/solr/core/src/java/org/apache/solr/handler/export/ExportWriter.java
@@ -331,17 +331,20 @@ public class ExportWriter implements SolrCore.RawWriter, Closeable {
t = tupleStream.read();
} catch (final Exception e) {
buffers.getWriter().add((MapWriter) ew -> Tuple.EXCEPTION(e, true).writeMap(ew));
- throw e;
+ break;
}
if (t == null) {
break;
}
- if (t.EOF) {
+ if (t.EOF && !t.EXCEPTION) {
break;
}
// use decorated writer to monitor the number of output writes
// and flush the output quickly in case of very few (reduced) output items
buffers.getWriter().add((MapWriter) ew -> t.writeMap(ew));
+ if (t.EXCEPTION && t.EOF) {
+ break;
+ }
}
return true;
});
@@ -349,9 +352,9 @@ public class ExportWriter implements SolrCore.RawWriter, Closeable {
} else {
buffers.run(() -> {
// get the initial buffer
+ log.info("--- writer init exchanging from empty");
+ buffers.exchangeBuffers();
ExportBuffers.Buffer buffer = buffers.getOutputBuffer();
- log.info("--- writer init exchanging from " + buffer);
- buffer = buffers.exchange(buffer);
log.info("--- writer init got " + buffer);
while (buffer.outDocsIndex != ExportBuffers.Buffer.NO_MORE_DOCS) {
if (Thread.currentThread().isInterrupted()) {
@@ -365,7 +368,8 @@ public class ExportWriter implements SolrCore.RawWriter, Closeable {
buffer.writeItem(i, writer);
}
log.info("--- writer exchanging from " + buffer);
- buffer = buffers.exchange(buffer);
+ buffers.exchangeBuffers();
+ buffer = buffers.getOutputBuffer();
log.info("--- writer got " + buffer);
}
return true;
diff --git a/solr/core/src/java/org/apache/solr/handler/export/ExportWriterStream.java b/solr/core/src/java/org/apache/solr/handler/export/ExportWriterStream.java
index 0a783a3..2e58304 100644
--- a/solr/core/src/java/org/apache/solr/handler/export/ExportWriterStream.java
+++ b/solr/core/src/java/org/apache/solr/handler/export/ExportWriterStream.java
@@ -19,6 +19,8 @@ package org.apache.solr.handler.export;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.util.List;
+import java.util.concurrent.BrokenBarrierException;
+import java.util.concurrent.TimeoutException;
import org.apache.solr.client.solrj.io.Tuple;
import org.apache.solr.client.solrj.io.comp.ComparatorOrder;
@@ -111,12 +113,42 @@ public class ExportWriterStream extends TupleStream implements Expressible {
try {
buffer.outDocsIndex = ExportBuffers.Buffer.EMPTY;
log.info("--- ews exchange empty buffer " + buffer);
- buffer = exportBuffers.exchanger.exchange(buffer);
- log.info("--- ews got new output buffer " + buffer);
+ boolean exchanged = false;
+ while (!exchanged) {
+ try {
+ exportBuffers.exchangeBuffers();
+ exchanged = true;
+ } catch (TimeoutException e) {
+ log.info("--- ews timeout loop");
+ if (exportBuffers.isShutDown()) {
+ log.info("--- ews - the other end is shutdown, returning EOF");
+ return Tuple.EOF();
+ }
+ continue;
+ } catch (InterruptedException e) {
+ log.info("--- ews interrupted");
+ exportBuffers.error(e);
+ return Tuple.EXCEPTION(e, true);
+ } catch (BrokenBarrierException e) {
+ if (exportBuffers.getError() != null) {
+ return Tuple.EXCEPTION(exportBuffers.getError(), true);
+ } else {
+ return Tuple.EXCEPTION(e, true);
+ }
+ }
+ }
} catch (InterruptedException e) {
log.info("--- ews interrupt");
- Thread.currentThread().interrupt();
- throw new IOException("interrupted");
+ exportBuffers.error(e);
+ return Tuple.EXCEPTION(e, true);
+ } catch (Exception e) {
+ log.info("--- ews exception", e);
+ exportBuffers.error(e);
+ return Tuple.EXCEPTION(e, true);
+ }
+ buffer = exportBuffers.getOutputBuffer();
+ if (buffer == null) {
+ return Tuple.EOF();
}
if (buffer.outDocsIndex == ExportBuffers.Buffer.NO_MORE_DOCS) {
log.info("--- ews EOF");
diff --git a/solr/core/src/test/org/apache/solr/handler/export/TestExportWriter.java b/solr/core/src/test/org/apache/solr/handler/export/TestExportWriter.java
index 148acbd..a6c2319 100644
--- a/solr/core/src/test/org/apache/solr/handler/export/TestExportWriter.java
+++ b/solr/core/src/test/org/apache/solr/handler/export/TestExportWriter.java
@@ -32,6 +32,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.lucene.util.TestUtil;
import org.apache.solr.SolrTestCaseJ4;
import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.params.StreamParams;
import org.apache.solr.common.util.SuppressForbidden;
import org.apache.solr.common.util.Utils;
import org.apache.solr.index.LogDocMergePolicyFactory;
@@ -709,8 +710,8 @@ public class TestExportWriter extends SolrTestCaseJ4 {
}
private void createLargeIndex() throws Exception {
- int BATCH_SIZE = 1000;
- int NUM_BATCHES = 100;
+ int BATCH_SIZE = 5000;
+ int NUM_BATCHES = 20;
SolrInputDocument[] docs = new SolrInputDocument[BATCH_SIZE];
for (int i = 0; i < NUM_BATCHES; i++) {
for (int j = 0; j < BATCH_SIZE; j++) {
@@ -770,6 +771,16 @@ public class TestExportWriter extends SolrTestCaseJ4 {
assertNotNull("missing count: " + doc, doc.get("count(*)"));
assertEquals(1000.0, ((Number)doc.get("count(*)")).doubleValue(), 200.0);
}
+ // try invalid field types
+ req = req("q", "*:*", "qt", "/export", "fl", "id,sortabledv,small_i_p", "sort", "sortabledv asc", "expr", "unique(input(),over=\"sortabledv\")");
+ rsp = h.query(req);
+ rspMap = mapper.readValue(rsp, HashMap.class);
+ assertEquals("wrong response status", 400, ((Number)Utils.getObjectByPath(rspMap, false, "/responseHeader/status")).intValue());
+ docs = (List<Map<String, Object>>) Utils.getObjectByPath(rspMap, false, "/response/docs");
+ assertEquals("wrong number of docs", 1, docs.size());
+ Map<String, Object> doc = docs.get(0);
+ assertTrue("doc doesn't have exception", doc.containsKey(StreamParams.EXCEPTION));
+ assertTrue("wrong exception message", doc.get(StreamParams.EXCEPTION).toString().contains("Must have useDocValuesAsStored='true'"));
}
private void validateSort(int numDocs) throws Exception {