You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ab...@apache.org on 2020/06/15 08:56:44 UTC
[lucene-solr] branch jira/solr-14537 updated: SOLR-14537: Refactor
for clarity - move some inner classes to top level.
This is an automated email from the ASF dual-hosted git repository.
ab pushed a commit to branch jira/solr-14537
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git
The following commit(s) were added to refs/heads/jira/solr-14537 by this push:
new ea97e7f SOLR-14537: Refactor for clarity - move some inner classes to top level.
ea97e7f is described below
commit ea97e7f9d23227dfe5e0afef0c61ece4918dca0c
Author: Andrzej Bialecki <ab...@apache.org>
AuthorDate: Mon Jun 15 10:55:47 2020 +0200
SOLR-14537: Refactor for clarity - move some inner classes to top level.
---
.../org/apache/solr/handler/ExportHandler.java | 3 +-
.../apache/solr/handler/export/ExportBuffers.java | 259 +++++++++++++++
.../apache/solr/handler/export/ExportWriter.java | 350 ++-------------------
.../solr/handler/export/ExportWriterStream.java | 157 +++++++++
4 files changed, 437 insertions(+), 332 deletions(-)
diff --git a/solr/core/src/java/org/apache/solr/handler/ExportHandler.java b/solr/core/src/java/org/apache/solr/handler/ExportHandler.java
index 04800a3..a89eeae 100644
--- a/solr/core/src/java/org/apache/solr/handler/ExportHandler.java
+++ b/solr/core/src/java/org/apache/solr/handler/ExportHandler.java
@@ -34,6 +34,7 @@ import org.apache.solr.core.CoreContainer;
import org.apache.solr.core.SolrCore;
import org.apache.solr.handler.component.SearchHandler;
import org.apache.solr.handler.export.ExportWriter;
+import org.apache.solr.handler.export.ExportWriterStream;
import org.apache.solr.request.SolrQueryRequest;
import org.apache.solr.response.SolrQueryResponse;
import org.slf4j.Logger;
@@ -66,7 +67,7 @@ public class ExportHandler extends SearchHandler {
for (String function : forbiddenStreams) {
this.withoutFunctionName(function);
}
- this.withFunctionName("input", ExportWriter.ExportWriterStream.class);
+ this.withFunctionName("input", ExportWriterStream.class);
}
}
diff --git a/solr/core/src/java/org/apache/solr/handler/export/ExportBuffers.java b/solr/core/src/java/org/apache/solr/handler/export/ExportBuffers.java
new file mode 100644
index 0000000..5525f81
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/handler/export/ExportBuffers.java
@@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.handler.export;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Exchanger;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.LongAdder;
+
+import org.apache.lucene.index.LeafReaderContext;
+import org.apache.lucene.search.Sort;
+import org.apache.solr.common.IteratorWriter;
+import org.apache.solr.common.MapWriter;
+import org.apache.solr.common.util.ExecutorUtil;
+import org.apache.solr.common.util.SolrNamedThreadFactory;
+import org.apache.solr.search.SolrIndexSearcher;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Helper class implementing a "double buffering" producer / consumer.
+ */
+class ExportBuffers {
+ private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ static final long EXCHANGE_TIMEOUT_SECONDS = 600;
+ static final String EXPORT_BUFFERS_KEY = "__eb__";
+
+ final Buffer bufferOne;
+ final Buffer bufferTwo;
+ final Exchanger<Buffer> exchanger = new Exchanger<>();
+ final SortDoc[] outDocs;
+ final List<LeafReaderContext> leaves;
+ final ExportWriter exportWriter;
+ final OutputStream os;
+ final IteratorWriter.ItemWriter rawWriter;
+ final IteratorWriter.ItemWriter writer;
+ final int totalHits;
+ Buffer fillBuffer;
+ Buffer outputBuffer;
+ Runnable filler;
+ ExecutorService service;
+ LongAdder outputCounter = new LongAdder();
+
+ ExportBuffers(ExportWriter exportWriter, List<LeafReaderContext> leaves, SolrIndexSearcher searcher,
+ OutputStream os, IteratorWriter.ItemWriter rawWriter, Sort sort, int queueSize, int totalHits) {
+ this.exportWriter = exportWriter;
+ this.leaves = leaves;
+ this.os = os;
+ this.rawWriter = rawWriter;
+ this.writer = new IteratorWriter.ItemWriter() {
+ @Override
+ public IteratorWriter.ItemWriter add(Object o) throws IOException {
+ rawWriter.add(o);
+ outputCounter.increment();
+ return this;
+ }
+ };
+ this.outDocs = new SortDoc[queueSize];
+ this.bufferOne = new Buffer();
+ this.bufferTwo = new Buffer();
+ this.totalHits = totalHits;
+ fillBuffer = bufferOne;
+ outputBuffer = bufferTwo;
+ filler = () -> {
+ try {
+ SortDoc sortDoc = exportWriter.getSortDoc(searcher, sort.getSort());
+ SortQueue queue = new SortQueue(queueSize, sortDoc);
+ long lastOutputCounter = 0;
+ for (int count = 0; count < totalHits; ) {
+ log.info("--- filler fillOutDocs in " + fillBuffer);
+ exportWriter.fillOutDocs(leaves, sortDoc, queue, outDocs, fillBuffer);
+ count += (fillBuffer.outDocsIndex + 1);
+ log.info("--- filler count=" + count + ", exchange buffer from " + fillBuffer);
+ fillBuffer = exchange(fillBuffer);
+ if (outputCounter.longValue() > lastOutputCounter) {
+ lastOutputCounter = outputCounter.longValue();
+ flushOutput();
+ }
+ log.info("--- filler got empty buffer " + fillBuffer);
+ }
+ fillBuffer.outDocsIndex = Buffer.NO_MORE_DOCS;
+ log.info("--- filler final exchange buffer from " + fillBuffer);
+ fillBuffer = exchange(fillBuffer);
+ log.info("--- filler final got buffer " + fillBuffer);
+ } catch (Exception e) {
+ log.error("filler", e);
+ shutdownNow();
+ }
+ };
+ }
+
+ private void flushOutput() throws IOException {
+ os.flush();
+ }
+
+ // initial output buffer
+ public Buffer getOutputBuffer() {
+ return outputBuffer;
+ }
+
+ // decorated writer that keeps track of number of writes
+ public IteratorWriter.ItemWriter getWriter() {
+ return writer;
+ }
+
+ public Buffer exchange(Buffer buffer) throws InterruptedException, TimeoutException {
+ return exchanger.exchange(buffer, EXCHANGE_TIMEOUT_SECONDS, TimeUnit.SECONDS);
+ }
+
+ public void shutdownNow() {
+ if (service != null) {
+ log.info("--- shutting down buffers");
+ service.shutdownNow();
+ }
+ }
+
+ /**
+ * Start processing and block until complete or Exception is thrown.
+ *
+ * @param writer writer that exchanges and processes buffers received from a producer.
+ * @throws IOException on errors
+ */
+ public void run(Callable<Boolean> writer) throws IOException {
+ service = ExecutorUtil.newMDCAwareFixedThreadPool(1, new SolrNamedThreadFactory("ExportBuffers"));
+ try {
+ CompletableFuture.runAsync(filler, service);
+ writer.call();
+
+ // alternatively we could run the writer in a separate thread:
+// CompletableFuture<Void> allDone = CompletableFuture.allOf(
+// CompletableFuture.runAsync(filler, service),
+// CompletableFuture.runAsync(() -> {
+// try {
+// writer.call();
+// } catch (Exception e) {
+// log.error("writer", e);
+// shutdownNow();
+// }
+// }, service)
+// );
+// allDone.join();
+ log.info("-- finished.");
+ } catch (Exception e) {
+ log.error("Exception running filler / writer", e);
+ //
+ } finally {
+ log.info("--- all done, shutting down buffers");
+ service.shutdownNow();
+ }
+ }
+
+ public static final class Buffer implements MapWriter.EntryWriter {
+ static final int EMPTY = -1;
+ static final int NO_MORE_DOCS = -2;
+
+ int outDocsIndex = EMPTY;
+ // use array-of-arrays instead of Map to conserve space
+ Object[][] outDocs;
+ int pos = EMPTY;
+
+ MapWriter.EntryWriter getEntryWriter(int pos, int numFields) {
+ if (outDocs == null) {
+ outDocs = new Object[outDocsIndex + 1][];
+ }
+ this.pos = pos;
+ Object[] fields = new Object[numFields << 1];
+ outDocs[pos] = fields;
+ return this;
+ }
+
+ @Override
+ public MapWriter.EntryWriter put(CharSequence k, Object v) throws IOException {
+ if (pos < 0) {
+ throw new IOException("Invalid entry position");
+ }
+ Object[] fields = outDocs[pos];
+ boolean putOk = false;
+ for (int i = 0; i < fields.length; i += 2) {
+ if (fields[i] == null || fields[i].equals(k)) {
+ fields[i] = k;
+ // convert everything complex into POJOs at this point
+ // to avoid accessing docValues or termEnums from other threads
+ if (v instanceof IteratorWriter) {
+ List lst = new ArrayList();
+ ((IteratorWriter)v).toList(lst);
+ v = lst;
+ } else if (v instanceof MapWriter) {
+ Map<String, Object> map = new HashMap<>();
+ ((MapWriter)v).toMap(map);
+ v = map;
+ }
+ fields[i + 1] = v;
+ putOk = true;
+ break;
+ }
+ }
+ if (!putOk) {
+ throw new IOException("should not happen! pos=" + pos + " ran out of space for field " + k + "=" + v
+ + " - already full: " + Arrays.toString(fields));
+ }
+ return this;
+ }
+
+ // helper method to make it easier to write our internal key-value array as if it were a map
+ public void writeItem(int pos, IteratorWriter.ItemWriter itemWriter) throws IOException {
+ final Object[] fields = outDocs[pos];
+ if (fields == null) {
+ return;
+ }
+ itemWriter.add((MapWriter) ew -> {
+ for (int i = 0; i < fields.length; i += 2) {
+ if (fields[i] == null) {
+ continue;
+ }
+ ew.put((CharSequence)fields[i], fields[i + 1]);
+ }
+ });
+ }
+
+ public void reset() {
+ outDocsIndex = EMPTY;
+ pos = EMPTY;
+ outDocs = null;
+ }
+
+ @Override
+ public String toString() {
+ return "Buffer@" + Integer.toHexString(hashCode()) + "{" +
+ "outDocsIndex=" + outDocsIndex +
+ '}';
+ }
+ }
+}
diff --git a/solr/core/src/java/org/apache/solr/handler/export/ExportWriter.java b/solr/core/src/java/org/apache/solr/handler/export/ExportWriter.java
index 813db92..36bc40a 100644
--- a/solr/core/src/java/org/apache/solr/handler/export/ExportWriter.java
+++ b/solr/core/src/java/org/apache/solr/handler/export/ExportWriter.java
@@ -24,17 +24,7 @@ import java.io.OutputStreamWriter;
import java.io.PrintWriter;
import java.lang.invoke.MethodHandles;
import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
import java.util.List;
-import java.util.Map;
-import java.util.concurrent.Callable;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Exchanger;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
import org.apache.lucene.index.LeafReader;
import org.apache.lucene.index.LeafReaderContext;
@@ -46,18 +36,10 @@ import org.apache.lucene.util.BitSetIterator;
import org.apache.lucene.util.FixedBitSet;
import org.apache.solr.client.solrj.impl.BinaryResponseParser;
import org.apache.solr.client.solrj.io.Tuple;
-import org.apache.solr.client.solrj.io.comp.ComparatorOrder;
-import org.apache.solr.client.solrj.io.comp.FieldComparator;
-import org.apache.solr.client.solrj.io.comp.MultipleFieldComparator;
-import org.apache.solr.client.solrj.io.comp.StreamComparator;
import org.apache.solr.client.solrj.io.stream.StreamContext;
import org.apache.solr.client.solrj.io.stream.TupleStream;
-import org.apache.solr.client.solrj.io.stream.expr.Explanation;
-import org.apache.solr.client.solrj.io.stream.expr.Expressible;
-import org.apache.solr.client.solrj.io.stream.expr.StreamExplanation;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpression;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionNamedParameter;
-import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionParameter;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionParser;
import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
import org.apache.solr.common.IteratorWriter;
@@ -68,9 +50,7 @@ import org.apache.solr.common.SolrException;
import org.apache.solr.common.params.CommonParams;
import org.apache.solr.common.params.SolrParams;
import org.apache.solr.common.params.StreamParams;
-import org.apache.solr.common.util.ExecutorUtil;
import org.apache.solr.common.util.JavaBinCodec;
-import org.apache.solr.common.util.SolrNamedThreadFactory;
import org.apache.solr.core.SolrCore;
import org.apache.solr.request.SolrQueryRequest;
import org.apache.solr.request.SolrRequestInfo;
@@ -109,7 +89,7 @@ import static org.apache.solr.common.util.Utils.makeMap;
* Priority Queue. They are then exported (written across the wire) and marked as sent (unset in the bitmap).
* This process repeats until all matching documents have been sent.
* <p>
- * This streaming approach is light on memory (only {@link #DOCUMENT_BATCH_SIZE} documents are ever stored in memory at
+ * This streaming approach is light on memory (only up to 2x {@link #DOCUMENT_BATCH_SIZE} documents are ever stored in memory at
* once), and it allows {@link ExportWriter} to scale well with regard to numDocs.
*/
public class ExportWriter implements SolrCore.RawWriter, Closeable {
@@ -117,10 +97,6 @@ public class ExportWriter implements SolrCore.RawWriter, Closeable {
private static final int DOCUMENT_BATCH_SIZE = 30000;
- private static final String EXPORT_WRITER_KEY = "__ew__";
- private static final String EXPORT_BUFFERS_KEY = "__eb__";
- private static final String LEAF_READERS_KEY = "__leaves__";
-
private OutputStreamWriter respWriter;
final SolrQueryRequest req;
final SolrQueryResponse res;
@@ -133,115 +109,6 @@ public class ExportWriter implements SolrCore.RawWriter, Closeable {
PushWriter writer;
private String wt;
- public static class ExportWriterStream extends TupleStream implements Expressible {
- StreamContext context;
- StreamComparator streamComparator;
- int pos = -1;
- ExportBuffers exportBuffers;
- Buffer buffer;
-
- public ExportWriterStream(StreamExpression expression, StreamFactory factory) throws IOException {
- streamComparator = parseComp(factory.getDefaultSort());
- }
-
- @Override
- public void setStreamContext(StreamContext context) {
- this.context = context;
- }
-
- @Override
- public List<TupleStream> children() {
- return null;
- }
-
- private StreamComparator parseComp(String sort) throws IOException {
-
- String[] sorts = sort.split(",");
- StreamComparator[] comps = new StreamComparator[sorts.length];
- for (int i = 0; i < sorts.length; i++) {
- String s = sorts[i];
-
- String[] spec = s.trim().split("\\s+"); //This should take into account spaces in the sort spec.
-
- if (spec.length != 2) {
- throw new IOException("Invalid sort spec:" + s);
- }
-
- String fieldName = spec[0].trim();
- String order = spec[1].trim();
-
- comps[i] = new FieldComparator(fieldName, order.equalsIgnoreCase("asc") ? ComparatorOrder.ASCENDING : ComparatorOrder.DESCENDING);
- }
-
- if (comps.length > 1) {
- return new MultipleFieldComparator(comps);
- } else {
- return comps[0];
- }
- }
-
- @Override
- public void open() throws IOException {
- exportBuffers = (ExportBuffers) context.get(EXPORT_BUFFERS_KEY);
- buffer = exportBuffers.getOutputBuffer();
- }
-
- @Override
- public void close() throws IOException {
- exportBuffers = null;
- }
-
- @Override
- public Tuple read() throws IOException {
- if (pos < 0) {
- try {
- buffer.outDocsIndex = Buffer.EMPTY;
- log.info("--- ews exchange empty buffer " + buffer);
- buffer = exportBuffers.exchanger.exchange(buffer);
- log.info("--- ews got new output buffer " + buffer);
- } catch (InterruptedException e) {
- log.info("--- ews interrupt");
- Thread.currentThread().interrupt();
- throw new IOException("interrupted");
- }
- if (buffer.outDocsIndex == Buffer.NO_MORE_DOCS) {
- log.info("--- ews EOF");
- return Tuple.EOF();
- } else {
- pos = buffer.outDocsIndex;
- log.info("--- ews new pos=" + pos);
- }
- }
- if (pos < 0) {
- log.info("--- ews EOF?");
- return Tuple.EOF();
- }
- Tuple tuple = new Tuple(buffer.outDocs[pos]);
- pos--;
- return tuple;
- }
-
- @Override
- public StreamComparator getStreamSort() {
- return streamComparator;
- }
-
- @Override
- public StreamExpressionParameter toExpression(StreamFactory factory) throws IOException {
- StreamExpression expression = new StreamExpression(factory.getFunctionName(this.getClass()));
- return expression;
- }
-
- @Override
- public Explanation toExplanation(StreamFactory factory) throws IOException {
- return new StreamExplanation(getStreamNodeId().toString())
- .withFunctionName("input")
- .withImplementingClass(this.getClass().getName())
- .withExpressionType(Explanation.ExpressionType.STREAM_SOURCE)
- .withExpression("--non-expressible--");
- }
- }
-
public ExportWriter(SolrQueryRequest req, SolrQueryResponse res, String wt, StreamContext initialStreamContext) {
this.req = req;
@@ -397,7 +264,7 @@ public class ExportWriter implements SolrCore.RawWriter, Closeable {
m.put("responseHeader", singletonMap("status", 0));
m.put("response", (MapWriter) mw -> {
mw.put("numFound", totalHits);
- mw.put("docs", (IteratorWriter) iw -> writeDocs(req, iw, sort));
+ mw.put("docs", (IteratorWriter) iw -> writeDocs(req, os, iw, sort));
});
});
if (streamContext != null) {
@@ -432,7 +299,7 @@ public class ExportWriter implements SolrCore.RawWriter, Closeable {
}
protected void transferBatchToBufferForOutput(SortQueue queue, SortDoc[] outDocs,
- List<LeafReaderContext> leaves, Buffer destination) throws IOException {
+ List<LeafReaderContext> leaves, ExportBuffers.Buffer destination) throws IOException {
int outDocsIndex = -1;
for (int i = 0; i < queue.maxSize; i++) {
SortDoc s = queue.pop();
@@ -444,199 +311,14 @@ public class ExportWriter implements SolrCore.RawWriter, Closeable {
materializeDocs(leaves, outDocs, destination);
}
- private static final class Buffer implements EntryWriter {
- static final int EMPTY = -1;
- static final int NO_MORE_DOCS = -2;
-
- int outDocsIndex = EMPTY;
- // use array-of-arrays instead of Map to conserve space
- Object[][] outDocs;
- int pos = EMPTY;
-
- EntryWriter getEntryWriter(int pos, int numFields) {
- if (outDocs == null) {
- outDocs = new Object[outDocsIndex + 1][];
- }
- this.pos = pos;
- Object[] fields = new Object[numFields << 1];
- outDocs[pos] = fields;
- return this;
- }
-
- @Override
- public EntryWriter put(CharSequence k, Object v) throws IOException {
- if (pos < 0) {
- throw new IOException("Invalid entry position");
- }
- Object[] fields = outDocs[pos];
- boolean putOk = false;
- for (int i = 0; i < fields.length; i += 2) {
- if (fields[i] == null || fields[i].equals(k)) {
- fields[i] = k;
- // convert everything complex into POJOs at this point
- if (v instanceof IteratorWriter) {
- List lst = new ArrayList();
- ((IteratorWriter)v).toList(lst);
- v = lst;
- } else if (v instanceof MapWriter) {
- Map<String, Object> map = new HashMap<>();
- ((MapWriter)v).toMap(map);
- v = map;
- }
- fields[i + 1] = v;
- putOk = true;
- break;
- }
- }
- if (!putOk) {
- throw new IOException("should not happen! pos=" + pos + " ran out of space for field " + k + "=" + v
- + " - already full: " + Arrays.toString(fields));
- }
- return this;
- }
-
- public void writeItem(int pos, IteratorWriter.ItemWriter itemWriter) throws IOException {
- final Object[] fields = outDocs[pos];
- if (fields == null) {
- log.info("--- writeItem no fields at " + pos);
- return;
- }
- itemWriter.add((MapWriter) ew -> {
- for (int i = 0; i < fields.length; i += 2) {
- if (fields[i] == null) {
- log.info("--- writeItem empty field at " + pos + "/" + i);
- continue;
- }
- log.info("--- writeItem field " + pos + "/" + i);
- ew.put((CharSequence)fields[i], fields[i + 1]);
- log.info("--- writeItem done field " + pos + "/" + i);
- }
- });
- }
-
- public void reset() {
- outDocsIndex = EMPTY;
- pos = EMPTY;
- outDocs = null;
- }
-
- @Override
- public String toString() {
- return "Buffer@" + Integer.toHexString(hashCode()) + "{" +
- "outDocsIndex=" + outDocsIndex +
- '}';
- }
- }
-
- /* Helper class implementing a "double buffering" producer / consumer. */
- private static final class ExportBuffers {
- static final long EXCHANGE_TIMEOUT_SECONDS = 300;
-
- final Buffer bufferOne;
- final Buffer bufferTwo;
- final Exchanger<Buffer> exchanger = new Exchanger<>();
- final SortDoc[] outDocs;
- final List<LeafReaderContext> leaves;
- final ExportWriter exportWriter;
- final int totalHits;
- Buffer fillBuffer;
- Buffer outputBuffer;
- Runnable filler;
- ExecutorService service;
-
- ExportBuffers (ExportWriter exportWriter, List<LeafReaderContext> leaves, SolrIndexSearcher searcher, Sort sort, int queueSize, int totalHits) {
- this.exportWriter = exportWriter;
- this.leaves = leaves;
- this.outDocs = new SortDoc[queueSize];
- this.bufferOne = new Buffer();
- this.bufferTwo = new Buffer();
- this.totalHits = totalHits;
- fillBuffer = bufferOne;
- outputBuffer = bufferTwo;
- filler = () -> {
- try {
- SortDoc sortDoc = exportWriter.getSortDoc(searcher, sort.getSort());
- SortQueue queue = new SortQueue(queueSize, sortDoc);
- for (int count = 0; count < totalHits; ) {
- log.info("--- filler fillOutDocs in " + fillBuffer);
- exportWriter.fillOutDocs(leaves, sortDoc, queue, outDocs, fillBuffer);
- count += (fillBuffer.outDocsIndex + 1);
- log.info("--- filler count=" + count + ", exchange buffer from " + fillBuffer);
- fillBuffer = exchanger.exchange(fillBuffer, EXCHANGE_TIMEOUT_SECONDS, TimeUnit.SECONDS);
- log.info("--- filler got empty buffer " + fillBuffer);
- }
- fillBuffer.outDocsIndex = Buffer.NO_MORE_DOCS;
- log.info("--- filler final exchange buffer from " + fillBuffer);
- fillBuffer = exchanger.exchange(fillBuffer, EXCHANGE_TIMEOUT_SECONDS, TimeUnit.SECONDS);
- log.info("--- filler final got buffer " + fillBuffer);
- } catch (Exception e) {
- log.error("filler", e);
- shutdownNow();
- }
- };
- }
-
- public Buffer getOutputBuffer() {
- return outputBuffer;
- }
-
- public Buffer exchange(Buffer buffer) throws InterruptedException, TimeoutException {
- return exchanger.exchange(buffer, EXCHANGE_TIMEOUT_SECONDS, TimeUnit.SECONDS);
- }
-
- public void shutdownNow() {
- if (service != null) {
- log.info("--- shutting down buffers");
- service.shutdownNow();
- }
- }
-
- /**
- * Start processing and block until complete or Exception is thrown.
- * @param writer writer that exchanges and processes buffers received from a producer.
- * @throws IOException on errors
- */
- public void run(Callable<Boolean> writer) throws IOException {
- service = ExecutorUtil.newMDCAwareFixedThreadPool(2, new SolrNamedThreadFactory("ExportBuffers"));
- try {
- CompletableFuture.runAsync(filler, service);
- writer.call();
-
- // alternatively we could run the writer in a separate thread too:
-// CompletableFuture<Void> allDone = CompletableFuture.allOf(
-// CompletableFuture.runAsync(filler, service),
-// CompletableFuture.runAsync(() -> {
-// try {
-// writer.call();
-// } catch (Exception e) {
-// log.error("writer", e);
-// shutdownNow();
-// }
-// }, service)
-// );
-// allDone.join();
- log.info("-- finished.");
- } catch (Exception e) {
- log.error("Exception running filler / writer", e);
- //
- } finally {
- log.info("--- all done, shutting down buffers");
- service.shutdownNow();
- }
- }
- }
-
- protected void writeDocs(SolrQueryRequest req, IteratorWriter.ItemWriter writer, Sort sort) throws IOException {
+ protected void writeDocs(SolrQueryRequest req, OutputStream os, IteratorWriter.ItemWriter writer, Sort sort) throws IOException {
List<LeafReaderContext> leaves = req.getSearcher().getTopReaderContext().leaves();
- SortDoc sortDoc = getSortDoc(req.getSearcher(), sort.getSort());
final int queueSize = Math.min(DOCUMENT_BATCH_SIZE, totalHits);
- ExportBuffers buffers = new ExportBuffers(this, leaves, req.getSearcher(), sort, queueSize, totalHits);
+ ExportBuffers buffers = new ExportBuffers(this, leaves, req.getSearcher(), os, writer, sort, queueSize, totalHits);
if (streamExpression != null) {
- streamContext.put(EXPORT_WRITER_KEY, this);
- streamContext.put(EXPORT_BUFFERS_KEY, buffers);
- streamContext.put(LEAF_READERS_KEY, leaves);
+ streamContext.put(ExportBuffers.EXPORT_BUFFERS_KEY, buffers);
final TupleStream tupleStream = createTupleStream();
tupleStream.open();
buffers.run(() -> {
@@ -651,23 +333,29 @@ public class ExportWriter implements SolrCore.RawWriter, Closeable {
if (t.EOF) {
break;
}
- writer.add((MapWriter) ew -> t.writeMap(ew));
+ // use decorated writer to monitor the number of output writes
+ // and flush the output quickly in case of very few (reduced) output items
+ buffers.getWriter().add((MapWriter) ew -> t.writeMap(ew));
}
return true;
});
tupleStream.close();
} else {
buffers.run(() -> {
- Buffer buffer = buffers.getOutputBuffer();
+ // get the initial buffer
+ ExportBuffers.Buffer buffer = buffers.getOutputBuffer();
log.info("--- writer init exchanging from " + buffer);
buffer = buffers.exchange(buffer);
log.info("--- writer init got " + buffer);
- while (buffer.outDocsIndex != Buffer.NO_MORE_DOCS) {
+ while (buffer.outDocsIndex != ExportBuffers.Buffer.NO_MORE_DOCS) {
if (Thread.currentThread().isInterrupted()) {
log.info("--- writer interrupted");
break;
}
for (int i = buffer.outDocsIndex; i >= 0; --i) {
+ // we're using the raw writer here because there's no potential
+ // reduction in the number of output items, unlike when using
+ // streaming expressions
buffer.writeItem(i, writer);
}
log.info("--- writer exchanging from " + buffer);
@@ -679,13 +367,13 @@ public class ExportWriter implements SolrCore.RawWriter, Closeable {
}
}
- private void fillOutDocs(List<LeafReaderContext> leaves, SortDoc sortDoc,
- SortQueue sortQueue, SortDoc[] outDocs, Buffer buffer) throws IOException {
+ void fillOutDocs(List<LeafReaderContext> leaves, SortDoc sortDoc,
+ SortQueue sortQueue, SortDoc[] outDocs, ExportBuffers.Buffer buffer) throws IOException {
identifyLowestSortingUnexportedDocs(leaves, sortDoc, sortQueue);
transferBatchToBufferForOutput(sortQueue, outDocs, leaves, buffer);
}
- private void materializeDocs(List<LeafReaderContext> leaves, SortDoc[] outDocs, Buffer buffer) throws IOException {
+ private void materializeDocs(List<LeafReaderContext> leaves, SortDoc[] outDocs, ExportBuffers.Buffer buffer) throws IOException {
log.info("--- materialize docs in " + buffer);
if (buffer.outDocsIndex < 0) {
return;
@@ -785,7 +473,7 @@ public class ExportWriter implements SolrCore.RawWriter, Closeable {
return writers;
}
- private SortDoc getSortDoc(SolrIndexSearcher searcher, SortField[] sortFields) throws IOException {
+ SortDoc getSortDoc(SolrIndexSearcher searcher, SortField[] sortFields) throws IOException {
SortValue[] sortValues = new SortValue[sortFields.length];
IndexSchema schema = searcher.getSchema();
for (int i = 0; i < sortFields.length; ++i) {
diff --git a/solr/core/src/java/org/apache/solr/handler/export/ExportWriterStream.java b/solr/core/src/java/org/apache/solr/handler/export/ExportWriterStream.java
new file mode 100644
index 0000000..0a783a3
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/handler/export/ExportWriterStream.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.handler.export;
+
+import java.io.IOException;
+import java.lang.invoke.MethodHandles;
+import java.util.List;
+
+import org.apache.solr.client.solrj.io.Tuple;
+import org.apache.solr.client.solrj.io.comp.ComparatorOrder;
+import org.apache.solr.client.solrj.io.comp.FieldComparator;
+import org.apache.solr.client.solrj.io.comp.MultipleFieldComparator;
+import org.apache.solr.client.solrj.io.comp.StreamComparator;
+import org.apache.solr.client.solrj.io.stream.StreamContext;
+import org.apache.solr.client.solrj.io.stream.TupleStream;
+import org.apache.solr.client.solrj.io.stream.expr.Explanation;
+import org.apache.solr.client.solrj.io.stream.expr.Expressible;
+import org.apache.solr.client.solrj.io.stream.expr.StreamExplanation;
+import org.apache.solr.client.solrj.io.stream.expr.StreamExpression;
+import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionParameter;
+import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Stream implementation that helps supporting 'expr' streaming in export writer.
+ * <p>Note: this class is made public only to allow access from {@link org.apache.solr.handler.ExportHandler},
+ * it should be treated as an internal detail of implementation.</p>
+ * @lucene.experimental
+ */
+public class ExportWriterStream extends TupleStream implements Expressible {
+ private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+ StreamContext context;
+ StreamComparator streamComparator;
+ int pos = -1;
+ ExportBuffers exportBuffers;
+ ExportBuffers.Buffer buffer;
+
+ public ExportWriterStream(StreamExpression expression, StreamFactory factory) throws IOException {
+ streamComparator = parseComp(factory.getDefaultSort());
+ }
+
+ /**
+ * NOTE: this context must contain an instance of {@link ExportBuffers} under the
+ * {@link ExportBuffers#EXPORT_BUFFERS_KEY} key.
+ */
+ @Override
+ public void setStreamContext(StreamContext context) {
+ this.context = context;
+ }
+
+ @Override
+ public List<TupleStream> children() {
+ return null;
+ }
+
+ private StreamComparator parseComp(String sort) throws IOException {
+
+ String[] sorts = sort.split(",");
+ StreamComparator[] comps = new StreamComparator[sorts.length];
+ for (int i = 0; i < sorts.length; i++) {
+ String s = sorts[i];
+
+ String[] spec = s.trim().split("\\s+"); //This should take into account spaces in the sort spec.
+
+ if (spec.length != 2) {
+ throw new IOException("Invalid sort spec:" + s);
+ }
+
+ String fieldName = spec[0].trim();
+ String order = spec[1].trim();
+
+ comps[i] = new FieldComparator(fieldName, order.equalsIgnoreCase("asc") ? ComparatorOrder.ASCENDING : ComparatorOrder.DESCENDING);
+ }
+
+ if (comps.length > 1) {
+ return new MultipleFieldComparator(comps);
+ } else {
+ return comps[0];
+ }
+ }
+
+ @Override
+ public void open() throws IOException {
+ exportBuffers = (ExportBuffers) context.get(ExportBuffers.EXPORT_BUFFERS_KEY);
+ buffer = exportBuffers.getOutputBuffer();
+ }
+
+ @Override
+ public void close() throws IOException {
+ exportBuffers = null;
+ }
+
+ @Override
+ public Tuple read() throws IOException {
+ if (pos < 0) {
+ try {
+ buffer.outDocsIndex = ExportBuffers.Buffer.EMPTY;
+ log.info("--- ews exchange empty buffer " + buffer);
+ buffer = exportBuffers.exchanger.exchange(buffer);
+ log.info("--- ews got new output buffer " + buffer);
+ } catch (InterruptedException e) {
+ log.info("--- ews interrupt");
+ Thread.currentThread().interrupt();
+ throw new IOException("interrupted");
+ }
+ if (buffer.outDocsIndex == ExportBuffers.Buffer.NO_MORE_DOCS) {
+ log.info("--- ews EOF");
+ return Tuple.EOF();
+ } else {
+ pos = buffer.outDocsIndex;
+ log.info("--- ews new pos=" + pos);
+ }
+ }
+ if (pos < 0) {
+ log.info("--- ews EOF?");
+ return Tuple.EOF();
+ }
+ Tuple tuple = new Tuple(buffer.outDocs[pos]);
+ pos--;
+ return tuple;
+ }
+
+ @Override
+ public StreamComparator getStreamSort() {
+ return streamComparator;
+ }
+
+ @Override
+ public StreamExpressionParameter toExpression(StreamFactory factory) throws IOException {
+ StreamExpression expression = new StreamExpression(factory.getFunctionName(this.getClass()));
+ return expression;
+ }
+
+ @Override
+ public Explanation toExplanation(StreamFactory factory) throws IOException {
+ return new StreamExplanation(getStreamNodeId().toString())
+ .withFunctionName("input")
+ .withImplementingClass(this.getClass().getName())
+ .withExpressionType(Explanation.ExpressionType.STREAM_SOURCE)
+ .withExpression("--non-expressible--");
+ }
+}