You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by dw...@apache.org on 2021/03/10 09:45:03 UTC
[lucene] 03/03: SOLR-15210: Fix the critical bugs. Existing tests
are passing now.
This is an automated email from the ASF dual-hosted git repository.
dweiss pushed a commit to branch jira/solr-15210
in repository https://gitbox.apache.org/repos/asf/lucene.git
commit fd85163e05f0d5a08ee869790e1e097e2e5f45d1
Author: Andrzej Bialecki <ab...@apache.org>
AuthorDate: Mon Mar 8 15:22:10 2021 +0100
SOLR-15210: Fix the critical bugs. Existing tests are passing now.
---
.../apache/solr/handler/export/ExportWriter.java | 74 ++++++++++++++--------
.../client/solrj/io/stream/ParallelStream.java | 58 ++++++++---------
.../solr/client/solrj/io/stream/SolrStream.java | 21 +++---
3 files changed, 89 insertions(+), 64 deletions(-)
diff --git a/solr/core/src/java/org/apache/solr/handler/export/ExportWriter.java b/solr/core/src/java/org/apache/solr/handler/export/ExportWriter.java
index 9622677..087a1b0 100644
--- a/solr/core/src/java/org/apache/solr/handler/export/ExportWriter.java
+++ b/solr/core/src/java/org/apache/solr/handler/export/ExportWriter.java
@@ -33,7 +33,6 @@ import org.apache.lucene.index.SortedDocValues;
import org.apache.lucene.search.DocIdSetIterator;
import org.apache.lucene.search.Sort;
import org.apache.lucene.search.SortField;
-import org.apache.lucene.util.BitSet;
import org.apache.lucene.util.BitSetIterator;
import org.apache.lucene.util.FixedBitSet;
import org.apache.solr.client.solrj.impl.BinaryResponseParser;
@@ -112,6 +111,12 @@ public class ExportWriter implements SolrCore.RawWriter, Closeable {
final String metricsPath;
//The batch size for the output writer thread.
final int batchSize;
+ final private String wt;
+ final int numWorkers;
+ final int workerId;
+ final String fieldList;
+ final List<String> partitionKeys;
+ final String partitionCacheKey;
//The max combined size of the segment level priority queues.
private int priorityQueueSize;
StreamExpression streamExpression;
@@ -120,21 +125,16 @@ public class ExportWriter implements SolrCore.RawWriter, Closeable {
int totalHits = 0;
FixedBitSet[] sets = null;
PushWriter writer;
- final private String wt;
- final int numWorkers;
- final int workerId;
- final String fieldList;
- final List<String> partitionKeys;
- final String partitionCacheKey;
// per-segment caches for already populated partitioning filters when parallel() is in use
final SolrCache<IndexReader.CacheKey, SolrCache<String, FixedBitSet>> partitionCaches;
- // per-segment partitioning filters that are incomplete (still being updated from the current request)
+ // local per-segment partitioning filters that are incomplete (still being updated from the current request)
final Map<IndexReader.CacheKey, FixedBitSet> tempPartitionCaches;
+ @SuppressWarnings("unchecked")
public ExportWriter(SolrQueryRequest req, SolrQueryResponse res, String wt,
StreamContext initialStreamContext, SolrMetricsContext solrMetricsContext,
String metricsPath) throws Exception {
@@ -164,7 +164,7 @@ public class ExportWriter implements SolrCore.RawWriter, Closeable {
}
this.fieldList = req.getParams().get(CommonParams.FL);
this.batchSize = DEFAULT_BATCH_SIZE;
- this.partitionCaches = req.getSearcher().getCache(SOLR_CACHE_KEY);
+ this.partitionCaches = (SolrCache<IndexReader.CacheKey, SolrCache<String, FixedBitSet>>)req.getSearcher().getCache(SOLR_CACHE_KEY);
}
@Override
@@ -299,7 +299,7 @@ public class ExportWriter implements SolrCore.RawWriter, Closeable {
return;
}
- outputDoc = new DoubleArrayMapWriter(fieldWriters.length);
+ outputDoc = new OutputDocMapWriter(fields, partitionKeys);
String expr = params.get(StreamParams.EXPR);
if (expr != null) {
@@ -526,15 +526,32 @@ public class ExportWriter implements SolrCore.RawWriter, Closeable {
}
}
- // not sure about this class - it reduces object allocation but has linear cost of get()
- private static final class DoubleArrayMapWriter implements MapWriter, EntryWriter {
+ // not sure about this class - it somewhat reduces object allocation as compared to LinkedHashMap
+ // NOTE: the lookup of values associated with partition keys uses an int lookup table that is
+ // indexed by the ord of the partition key in the list of partition keys.
+ private static final class OutputDocMapWriter implements MapWriter, EntryWriter {
final CharSequence[] keys;
final Object[] values;
+ final int[] partitionKeyToFieldIdx;
int pos;
- DoubleArrayMapWriter(int size) {
- keys = new CharSequence[size];
- values = new Object[size];
+ OutputDocMapWriter(String[] fields, List<String> partitionKeys) {
+ keys = new CharSequence[fields.length];
+ values = new Object[fields.length];
+ if (partitionKeys != null) {
+ partitionKeyToFieldIdx = new int[partitionKeys.size()];
+OUTER: for (int keyIdx = 0; keyIdx < partitionKeys.size(); keyIdx++) {
+ for (int fieldIdx = 0; fieldIdx < fields.length; fieldIdx++) {
+ if (fields[fieldIdx].equals(partitionKeys.get(keyIdx))) {
+ partitionKeyToFieldIdx[keyIdx] = fieldIdx;
+ continue OUTER;
+ }
+ }
+ partitionKeyToFieldIdx[keyIdx] = -1;
+ }
+ } else {
+ partitionKeyToFieldIdx = null;
+ }
pos = 0;
}
@@ -561,18 +578,23 @@ public class ExportWriter implements SolrCore.RawWriter, Closeable {
}
}
- public Object get(CharSequence key) {
- for (int i = 0; i < pos; i++) {
- if (keys[i].equals(key)) {
- return values[i];
- }
+ /**
+ * Get the value associated with the partition key
+ * @param keyIdx index of the partition key in the list of keys
+ * @return associated value or null if missing
+ */
+ public Object get(int keyIdx) {
+ final int fieldIdx = partitionKeyToFieldIdx[keyIdx];
+ if (fieldIdx == -1) {
+ return null;
+ } else {
+ return values[fieldIdx];
}
- return null;
}
}
// we materialize this document so that we can potentially do hash partitioning
- private DoubleArrayMapWriter outputDoc;
+ private OutputDocMapWriter outputDoc;
// WARNING: single-thread only! shared var outputDoc
MapWriter fillOutputDoc(SortDoc sortDoc,
@@ -588,7 +610,7 @@ public class ExportWriter implements SolrCore.RawWriter, Closeable {
++fieldIndex;
}
}
- if (partitionKeys != null) {
+ if (partitionKeys == null) {
return outputDoc;
} else {
// if we use partitioning then filter out unwanted docs
@@ -596,11 +618,11 @@ public class ExportWriter implements SolrCore.RawWriter, Closeable {
}
}
- MapWriter partitionFilter(SortDoc sortDoc, LeafReaderContext leaf, DoubleArrayMapWriter doc) {
+ MapWriter partitionFilter(SortDoc sortDoc, LeafReaderContext leaf, OutputDocMapWriter doc) {
// calculate hash
int hash = 0;
- for (String key : partitionKeys) {
- Object value = doc.get(key);
+ for (int keyIdx = 0; keyIdx < partitionKeys.size(); keyIdx++) {
+ Object value = doc.get(keyIdx);
if (value != null) {
hash += value.hashCode();
}
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ParallelStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ParallelStream.java
index b567f86..4f08155 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ParallelStream.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ParallelStream.java
@@ -32,7 +32,9 @@ import org.apache.solr.client.solrj.io.stream.expr.StreamExpression;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionNamedParameter;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionValue;
import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
+import org.apache.solr.common.params.CommonParams;
import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.params.StreamParams;
import static org.apache.solr.common.params.CommonParams.DISTRIB;
import static org.apache.solr.common.params.CommonParams.SORT;
@@ -86,54 +88,52 @@ public class ParallelStream extends CloudSolrStream implements Expressible {
// validate expression contains only what we want.
- if(expression.getParameters().size() != streamExpressions.size() + 3 + (null != zkHostExpression ? 1 : 0)){
+ if (expression.getParameters().size() != streamExpressions.size() + 3 + (null != zkHostExpression ? 1 : 0)) {
throw new IOException(String.format(Locale.ROOT,"Invalid expression %s - unknown operands found", expression));
}
// Collection Name
- if(null == collectionName){
+ if (null == collectionName) {
throw new IOException(String.format(Locale.ROOT,"invalid expression %s - collectionName expected as first operand",expression));
}
// Workers
- if(null == workersParam || null == workersParam.getParameter() || !(workersParam.getParameter() instanceof StreamExpressionValue)){
+ if (null == workersParam || null == workersParam.getParameter() || !(workersParam.getParameter() instanceof StreamExpressionValue)){
throw new IOException(String.format(Locale.ROOT,"Invalid expression %s - expecting a single 'workers' parameter of type positive integer but didn't find one",expression));
}
- String workersStr = ((StreamExpressionValue)workersParam.getParameter()).getValue();
+ String workersStr = ((StreamExpressionValue) workersParam.getParameter()).getValue();
int workersInt = 0;
try{
workersInt = Integer.parseInt(workersStr);
- if(workersInt <= 0){
+ if (workersInt <= 0) {
throw new IOException(String.format(Locale.ROOT,"invalid expression %s - workers '%s' must be greater than 0.",expression, workersStr));
}
- }
- catch(NumberFormatException e){
+ } catch(NumberFormatException e) {
throw new IOException(String.format(Locale.ROOT,"invalid expression %s - workers '%s' is not a valid integer.",expression, workersStr));
}
// Stream
- if(1 != streamExpressions.size()){
+ if (1 != streamExpressions.size()) {
throw new IOException(String.format(Locale.ROOT,"Invalid expression %s - expecting a single stream but found %d",expression, streamExpressions.size()));
}
// Sort
- if(null == sortExpression || !(sortExpression.getParameter() instanceof StreamExpressionValue)){
+ if (null == sortExpression || !(sortExpression.getParameter() instanceof StreamExpressionValue)) {
throw new IOException(String.format(Locale.ROOT,"Invalid expression %s - expecting single 'sort' parameter telling us how to join the parallel streams but didn't find one",expression));
}
// zkHost, optional - if not provided then will look into factory list to get
String zkHost = null;
- if(null == zkHostExpression){
+ if (null == zkHostExpression) {
zkHost = factory.getCollectionZkHost(collectionName);
- if(zkHost == null) {
+ if (zkHost == null) {
zkHost = factory.getDefaultZkHost();
}
- }
- else if(zkHostExpression.getParameter() instanceof StreamExpressionValue){
+ } else if (zkHostExpression.getParameter() instanceof StreamExpressionValue) {
zkHost = ((StreamExpressionValue)zkHostExpression.getParameter()).getValue();
}
- if(null == zkHost){
- throw new IOException(String.format(Locale.ROOT,"invalid expression %s - zkHost not found for collection '%s'",expression,collectionName));
+ if (null == zkHost) {
+ throw new IOException(String.format(Locale.ROOT, "invalid expression %s - zkHost not found for collection '%s'", expression, collectionName));
}
// We've got all the required items
@@ -143,7 +143,7 @@ public class ParallelStream extends CloudSolrStream implements Expressible {
init(zkHost,collectionName,stream,workersInt,comp);
}
- private void init(String zkHost,String collection,TupleStream tupleStream,int workers,StreamComparator comp) throws IOException{
+ private void init(String zkHost, String collection, TupleStream tupleStream, int workers, StreamComparator comp) throws IOException{
this.zkHost = zkHost;
this.collection = collection;
this.workers = workers;
@@ -151,7 +151,7 @@ public class ParallelStream extends CloudSolrStream implements Expressible {
this.tupleStream = tupleStream;
// requires Expressible stream and comparator
- if(! (tupleStream instanceof Expressible)){
+ if (! (tupleStream instanceof Expressible)) {
throw new IOException("Unable to create ParallelStream with a non-expressible TupleStream.");
}
}
@@ -171,15 +171,13 @@ public class ParallelStream extends CloudSolrStream implements Expressible {
// workers
expression.addParameter(new StreamExpressionNamedParameter("workers", Integer.toString(workers)));
- if(includeStreams){
- if(tupleStream instanceof Expressible){
- expression.addParameter(((Expressible)tupleStream).toExpression(factory));
- }
- else{
+ if (includeStreams) {
+ if (tupleStream instanceof Expressible) {
+ expression.addParameter(((Expressible) tupleStream).toExpression(factory));
+ } else {
throw new IOException("This ParallelStream contains a non-expressible TupleStream - it cannot be converted to an expression");
}
- }
- else{
+ } else {
expression.addParameter("<stream>");
}
@@ -203,7 +201,7 @@ public class ParallelStream extends CloudSolrStream implements Expressible {
explanation.setExpression(toExpression(factory, false).toString());
// add a child for each worker
- for(int idx = 0; idx < workers; ++idx){
+ for (int idx = 0; idx < workers; ++idx) {
explanation.addChild(tupleStream.toExplanation(factory));
}
@@ -219,7 +217,7 @@ public class ParallelStream extends CloudSolrStream implements Expressible {
public Tuple read() throws IOException {
Tuple tuple = _read();
- if(tuple.EOF) {
+ if (tuple.EOF) {
/*
Map<String, Map> metrics = new HashMap();
Iterator<Entry<String,Tuple>> it = this.eofTuples.entrySet().iterator();
@@ -242,7 +240,7 @@ public class ParallelStream extends CloudSolrStream implements Expressible {
public void setStreamContext(StreamContext streamContext) {
this.streamContext = streamContext;
- if(streamFactory == null) {
+ if (streamFactory == null) {
this.streamFactory = streamContext.getStreamFactory();
}
this.tupleStream.setStreamContext(streamContext);
@@ -254,15 +252,15 @@ public class ParallelStream extends CloudSolrStream implements Expressible {
List<String> shardUrls = getShards(this.zkHost, this.collection, this.streamContext);
- for(int w=0; w<workers; w++) {
+ for (int w = 0; w < workers; w++) {
ModifiableSolrParams paramsLoc = new ModifiableSolrParams();
paramsLoc.set(DISTRIB,"false"); // We are the aggregator.
paramsLoc.set(NUM_WORKERS_PARAM, workers);
paramsLoc.set(WORKER_ID_PARAM, w);
paramsLoc.set(USE_HASH_QUERY_PARAM, false);
- paramsLoc.set("expr", pushStream.toString());
- paramsLoc.set("qt","/stream");
+ paramsLoc.set(StreamParams.EXPR, pushStream.toString());
+ paramsLoc.set(CommonParams.QT,"/stream");
String url = shardUrls.get(w);
SolrStream solrStream = new SolrStream(url, paramsLoc);
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/SolrStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/SolrStream.java
index 58accf5..504d6a0 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/SolrStream.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/SolrStream.java
@@ -125,7 +125,7 @@ public class SolrStream extends TupleStream {
try {
SolrParams requestParams = loadParams(params);
if (!distrib) {
- ((ModifiableSolrParams) requestParams).add("distrib","false");
+ ((ModifiableSolrParams) requestParams).add(CommonParams.DISTRIB, "false");
}
tupleStreamParser = constructParser(requestParams);
} catch (Exception e) {
@@ -153,17 +153,22 @@ public class SolrStream extends TupleStream {
ModifiableSolrParams solrParams = new ModifiableSolrParams(paramsIn);
if (params.get("partitionKeys") != null) {
if (!params.get("partitionKeys").equals("none") && numWorkers > 1) {
- if (params.getBool(ParallelStream.USE_HASH_QUERY_PARAM, false)) {
+ // turn on ExportWriter partitioning only for /export and only when requested
+ String qt = params.get(CommonParams.QT);
+ if (qt != null && qt.equals("/export")) {
+ solrParams.add(ParallelStream.WORKER_ID_PARAM, String.valueOf(workerID));
+ solrParams.add(ParallelStream.NUM_WORKERS_PARAM, String.valueOf(numWorkers));
+ } else {
String partitionFilter = getPartitionFilter();
- solrParams.add("fq", partitionFilter);
+ solrParams.add(CommonParams.FQ, partitionFilter);
}
}
- } else if(numWorkers > 1) {
- throw new IOException("When numWorkers > 1 partitionKeys must be set. Set partitionKeys=none to send the entire stream to each worker.");
+ } else if (numWorkers > 1) {
+ throw new IOException("When numWorkers > 1 partitionKeys must be set. Set partitionKeys=none to send the entire stream to each worker.");
}
if (checkpoint > 0) {
- solrParams.add("fq", "{!frange cost=100 incl=false l="+checkpoint+"}_version_");
+ solrParams.add(CommonParams.FQ, "{!frange cost=100 incl=false l="+checkpoint+"}_version_");
}
return solrParams;
@@ -278,10 +283,10 @@ public class SolrStream extends TupleStream {
}
private TupleStreamParser constructParser(SolrParams requestParams) throws IOException, SolrServerException {
- String p = requestParams.get("qt");
+ String p = requestParams.get(CommonParams.QT);
if (p != null) {
ModifiableSolrParams modifiableSolrParams = (ModifiableSolrParams) requestParams;
- modifiableSolrParams.remove("qt");
+ modifiableSolrParams.remove(CommonParams.QT);
//performance optimization - remove extra whitespace by default when streaming
modifiableSolrParams.set("indent", modifiableSolrParams.get("indent", "off"));
}