You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by dw...@apache.org on 2021/03/10 09:45:03 UTC

[lucene] 03/03: SOLR-15210: Fix the critical bugs. Existing tests are passing now.

This is an automated email from the ASF dual-hosted git repository.

dweiss pushed a commit to branch jira/solr-15210
in repository https://gitbox.apache.org/repos/asf/lucene.git

commit fd85163e05f0d5a08ee869790e1e097e2e5f45d1
Author: Andrzej Bialecki <ab...@apache.org>
AuthorDate: Mon Mar 8 15:22:10 2021 +0100

    SOLR-15210: Fix the critical bugs. Existing tests are passing now.
---
 .../apache/solr/handler/export/ExportWriter.java   | 74 ++++++++++++++--------
 .../client/solrj/io/stream/ParallelStream.java     | 58 ++++++++---------
 .../solr/client/solrj/io/stream/SolrStream.java    | 21 +++---
 3 files changed, 89 insertions(+), 64 deletions(-)

diff --git a/solr/core/src/java/org/apache/solr/handler/export/ExportWriter.java b/solr/core/src/java/org/apache/solr/handler/export/ExportWriter.java
index 9622677..087a1b0 100644
--- a/solr/core/src/java/org/apache/solr/handler/export/ExportWriter.java
+++ b/solr/core/src/java/org/apache/solr/handler/export/ExportWriter.java
@@ -33,7 +33,6 @@ import org.apache.lucene.index.SortedDocValues;
 import org.apache.lucene.search.DocIdSetIterator;
 import org.apache.lucene.search.Sort;
 import org.apache.lucene.search.SortField;
-import org.apache.lucene.util.BitSet;
 import org.apache.lucene.util.BitSetIterator;
 import org.apache.lucene.util.FixedBitSet;
 import org.apache.solr.client.solrj.impl.BinaryResponseParser;
@@ -112,6 +111,12 @@ public class ExportWriter implements SolrCore.RawWriter, Closeable {
   final String metricsPath;
   //The batch size for the output writer thread.
   final int batchSize;
+  final private String wt;
+  final int numWorkers;
+  final int workerId;
+  final String fieldList;
+  final List<String> partitionKeys;
+  final String partitionCacheKey;
   //The max combined size of the segment level priority queues.
   private int priorityQueueSize;
   StreamExpression streamExpression;
@@ -120,21 +125,16 @@ public class ExportWriter implements SolrCore.RawWriter, Closeable {
   int totalHits = 0;
   FixedBitSet[] sets = null;
   PushWriter writer;
-  final private String wt;
-  final int numWorkers;
-  final int workerId;
-  final String fieldList;
-  final List<String> partitionKeys;
-  final String partitionCacheKey;
 
   // per-segment caches for already populated partitioning filters when parallel() is in use
   final SolrCache<IndexReader.CacheKey, SolrCache<String, FixedBitSet>> partitionCaches;
 
-  // per-segment partitioning filters that are incomplete (still being updated from the current request)
+  // local per-segment partitioning filters that are incomplete (still being updated from the current request)
   final Map<IndexReader.CacheKey, FixedBitSet> tempPartitionCaches;
 
 
 
+  @SuppressWarnings("unchecked")
   public ExportWriter(SolrQueryRequest req, SolrQueryResponse res, String wt,
                       StreamContext initialStreamContext, SolrMetricsContext solrMetricsContext,
                       String metricsPath) throws Exception {
@@ -164,7 +164,7 @@ public class ExportWriter implements SolrCore.RawWriter, Closeable {
     }
     this.fieldList = req.getParams().get(CommonParams.FL);
     this.batchSize = DEFAULT_BATCH_SIZE;
-    this.partitionCaches = req.getSearcher().getCache(SOLR_CACHE_KEY);
+    this.partitionCaches = (SolrCache<IndexReader.CacheKey, SolrCache<String, FixedBitSet>>)req.getSearcher().getCache(SOLR_CACHE_KEY);
   }
 
   @Override
@@ -299,7 +299,7 @@ public class ExportWriter implements SolrCore.RawWriter, Closeable {
       return;
     }
 
-    outputDoc = new DoubleArrayMapWriter(fieldWriters.length);
+    outputDoc = new OutputDocMapWriter(fields, partitionKeys);
 
     String expr = params.get(StreamParams.EXPR);
     if (expr != null) {
@@ -526,15 +526,32 @@ public class ExportWriter implements SolrCore.RawWriter, Closeable {
     }
   }
 
-  // not sure about this class - it reduces object allocation but has linear cost of get()
-  private static final class DoubleArrayMapWriter implements MapWriter, EntryWriter {
+  // not sure about this class - it somewhat reduces object allocation as compared to LinkedHashMap
+  // NOTE: the lookup of values associated with partition keys uses an int lookup table that is
+  // indexed by the ord of the partition key in the list of partition keys.
+  private static final class OutputDocMapWriter implements MapWriter, EntryWriter {
     final CharSequence[] keys;
     final Object[] values;
+    final int[] partitionKeyToFieldIdx;
     int pos;
 
-    DoubleArrayMapWriter(int size) {
-      keys = new CharSequence[size];
-      values = new Object[size];
+    OutputDocMapWriter(String[] fields, List<String> partitionKeys) {
+      keys = new CharSequence[fields.length];
+      values = new Object[fields.length];
+      if (partitionKeys != null) {
+        partitionKeyToFieldIdx = new int[partitionKeys.size()];
+OUTER:  for (int keyIdx = 0; keyIdx < partitionKeys.size(); keyIdx++) {
+          for (int fieldIdx = 0; fieldIdx < fields.length; fieldIdx++) {
+            if (fields[fieldIdx].equals(partitionKeys.get(keyIdx))) {
+              partitionKeyToFieldIdx[keyIdx] = fieldIdx;
+              continue OUTER;
+            }
+          }
+          partitionKeyToFieldIdx[keyIdx] = -1;
+        }
+      } else {
+        partitionKeyToFieldIdx = null;
+      }
       pos = 0;
     }
 
@@ -561,18 +578,23 @@ public class ExportWriter implements SolrCore.RawWriter, Closeable {
       }
     }
 
-    public Object get(CharSequence key) {
-      for (int i = 0; i < pos; i++) {
-        if (keys[i].equals(key)) {
-          return values[i];
-        }
+    /**
+     * Get the value associated with the partition key
+     * @param keyIdx index of the partition key in the list of keys
+     * @return associated value or null if missing
+     */
+    public Object get(int keyIdx) {
+      final int fieldIdx = partitionKeyToFieldIdx[keyIdx];
+      if (fieldIdx == -1) {
+        return null;
+      } else {
+        return values[fieldIdx];
       }
-      return null;
     }
   }
 
   // we materialize this document so that we can potentially do hash partitioning
-  private DoubleArrayMapWriter outputDoc;
+  private OutputDocMapWriter outputDoc;
 
   // WARNING: single-thread only! shared var outputDoc
   MapWriter fillOutputDoc(SortDoc sortDoc,
@@ -588,7 +610,7 @@ public class ExportWriter implements SolrCore.RawWriter, Closeable {
         ++fieldIndex;
       }
     }
-    if (partitionKeys != null) {
+    if (partitionKeys == null) {
       return outputDoc;
     } else {
       // if we use partitioning then filter out unwanted docs
@@ -596,11 +618,11 @@ public class ExportWriter implements SolrCore.RawWriter, Closeable {
     }
   }
 
-  MapWriter partitionFilter(SortDoc sortDoc, LeafReaderContext leaf, DoubleArrayMapWriter doc) {
+  MapWriter partitionFilter(SortDoc sortDoc, LeafReaderContext leaf, OutputDocMapWriter doc) {
     // calculate hash
     int hash = 0;
-    for (String key : partitionKeys) {
-      Object value = doc.get(key);
+    for (int keyIdx = 0; keyIdx < partitionKeys.size(); keyIdx++) {
+      Object value = doc.get(keyIdx);
       if (value != null) {
         hash += value.hashCode();
       }
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ParallelStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ParallelStream.java
index b567f86..4f08155 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ParallelStream.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ParallelStream.java
@@ -32,7 +32,9 @@ import org.apache.solr.client.solrj.io.stream.expr.StreamExpression;
 import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionNamedParameter;
 import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionValue;
 import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
+import org.apache.solr.common.params.CommonParams;
 import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.params.StreamParams;
 
 import static org.apache.solr.common.params.CommonParams.DISTRIB;
 import static org.apache.solr.common.params.CommonParams.SORT;
@@ -86,54 +88,52 @@ public class ParallelStream extends CloudSolrStream implements Expressible {
     
     // validate expression contains only what we want.
 
-    if(expression.getParameters().size() != streamExpressions.size() + 3 + (null != zkHostExpression ? 1 : 0)){
+    if (expression.getParameters().size() != streamExpressions.size() + 3 + (null != zkHostExpression ? 1 : 0)) {
       throw new IOException(String.format(Locale.ROOT,"Invalid expression %s - unknown operands found", expression));
     }
     
     // Collection Name
-    if(null == collectionName){
+    if (null == collectionName) {
       throw new IOException(String.format(Locale.ROOT,"invalid expression %s - collectionName expected as first operand",expression));
     }
 
     // Workers
-    if(null == workersParam || null == workersParam.getParameter() || !(workersParam.getParameter() instanceof StreamExpressionValue)){
+    if (null == workersParam || null == workersParam.getParameter() || !(workersParam.getParameter() instanceof StreamExpressionValue)){
       throw new IOException(String.format(Locale.ROOT,"Invalid expression %s - expecting a single 'workers' parameter of type positive integer but didn't find one",expression));
     }
-    String workersStr = ((StreamExpressionValue)workersParam.getParameter()).getValue();
+    String workersStr = ((StreamExpressionValue) workersParam.getParameter()).getValue();
     int workersInt = 0;
     try{
       workersInt = Integer.parseInt(workersStr);
-      if(workersInt <= 0){
+      if (workersInt <= 0) {
         throw new IOException(String.format(Locale.ROOT,"invalid expression %s - workers '%s' must be greater than 0.",expression, workersStr));
       }
-    }
-    catch(NumberFormatException e){
+    } catch(NumberFormatException e) {
       throw new IOException(String.format(Locale.ROOT,"invalid expression %s - workers '%s' is not a valid integer.",expression, workersStr));
     }    
 
     // Stream
-    if(1 != streamExpressions.size()){
+    if (1 != streamExpressions.size()) {
       throw new IOException(String.format(Locale.ROOT,"Invalid expression %s - expecting a single stream but found %d",expression, streamExpressions.size()));
     }
     
     // Sort
-    if(null == sortExpression || !(sortExpression.getParameter() instanceof StreamExpressionValue)){
+    if (null == sortExpression || !(sortExpression.getParameter() instanceof StreamExpressionValue)) {
       throw new IOException(String.format(Locale.ROOT,"Invalid expression %s - expecting single 'sort' parameter telling us how to join the parallel streams but didn't find one",expression));
     }
     
     // zkHost, optional - if not provided then will look into factory list to get
     String zkHost = null;
-    if(null == zkHostExpression){
+    if (null == zkHostExpression) {
       zkHost = factory.getCollectionZkHost(collectionName);
-      if(zkHost == null) {
+      if (zkHost == null) {
         zkHost = factory.getDefaultZkHost();
       }
-    }
-    else if(zkHostExpression.getParameter() instanceof StreamExpressionValue){
+    } else if (zkHostExpression.getParameter() instanceof StreamExpressionValue) {
       zkHost = ((StreamExpressionValue)zkHostExpression.getParameter()).getValue();
     }
-    if(null == zkHost){
-      throw new IOException(String.format(Locale.ROOT,"invalid expression %s - zkHost not found for collection '%s'",expression,collectionName));
+    if (null == zkHost) {
+      throw new IOException(String.format(Locale.ROOT, "invalid expression %s - zkHost not found for collection '%s'", expression, collectionName));
     }
     
     // We've got all the required items    
@@ -143,7 +143,7 @@ public class ParallelStream extends CloudSolrStream implements Expressible {
     init(zkHost,collectionName,stream,workersInt,comp);
   }
 
-  private void init(String zkHost,String collection,TupleStream tupleStream,int workers,StreamComparator comp) throws IOException{
+  private void init(String zkHost, String collection, TupleStream tupleStream, int workers, StreamComparator comp) throws IOException{
     this.zkHost = zkHost;
     this.collection = collection;
     this.workers = workers;
@@ -151,7 +151,7 @@ public class ParallelStream extends CloudSolrStream implements Expressible {
     this.tupleStream = tupleStream;
 
     // requires Expressible stream and comparator
-    if(! (tupleStream instanceof Expressible)){
+    if (! (tupleStream instanceof Expressible)) {
       throw new IOException("Unable to create ParallelStream with a non-expressible TupleStream.");
     }
   }
@@ -171,15 +171,13 @@ public class ParallelStream extends CloudSolrStream implements Expressible {
     // workers
     expression.addParameter(new StreamExpressionNamedParameter("workers", Integer.toString(workers)));
     
-    if(includeStreams){
-      if(tupleStream instanceof Expressible){
-        expression.addParameter(((Expressible)tupleStream).toExpression(factory));
-      }
-      else{
+    if (includeStreams) {
+      if (tupleStream instanceof Expressible) {
+        expression.addParameter(((Expressible) tupleStream).toExpression(factory));
+      } else {
         throw new IOException("This ParallelStream contains a non-expressible TupleStream - it cannot be converted to an expression");
       }
-    }
-    else{
+    } else {
       expression.addParameter("<stream>");
     }
         
@@ -203,7 +201,7 @@ public class ParallelStream extends CloudSolrStream implements Expressible {
     explanation.setExpression(toExpression(factory, false).toString());
     
     // add a child for each worker
-    for(int idx = 0; idx < workers; ++idx){
+    for (int idx = 0; idx < workers; ++idx) {
       explanation.addChild(tupleStream.toExplanation(factory));
     }
     
@@ -219,7 +217,7 @@ public class ParallelStream extends CloudSolrStream implements Expressible {
   public Tuple read() throws IOException {
     Tuple tuple = _read();
 
-    if(tuple.EOF) {
+    if (tuple.EOF) {
       /*
       Map<String, Map> metrics = new HashMap();
       Iterator<Entry<String,Tuple>> it = this.eofTuples.entrySet().iterator();
@@ -242,7 +240,7 @@ public class ParallelStream extends CloudSolrStream implements Expressible {
 
   public void setStreamContext(StreamContext streamContext) {
     this.streamContext = streamContext;
-    if(streamFactory == null) {
+    if (streamFactory == null) {
       this.streamFactory = streamContext.getStreamFactory();
     }
     this.tupleStream.setStreamContext(streamContext);
@@ -254,15 +252,15 @@ public class ParallelStream extends CloudSolrStream implements Expressible {
 
       List<String> shardUrls = getShards(this.zkHost, this.collection, this.streamContext);
 
-      for(int w=0; w<workers; w++) {
+      for (int w = 0; w < workers; w++) {
         ModifiableSolrParams paramsLoc = new ModifiableSolrParams();
         paramsLoc.set(DISTRIB,"false"); // We are the aggregator.
         paramsLoc.set(NUM_WORKERS_PARAM, workers);
         paramsLoc.set(WORKER_ID_PARAM, w);
         paramsLoc.set(USE_HASH_QUERY_PARAM, false);
 
-        paramsLoc.set("expr", pushStream.toString());
-        paramsLoc.set("qt","/stream");
+        paramsLoc.set(StreamParams.EXPR, pushStream.toString());
+        paramsLoc.set(CommonParams.QT,"/stream");
 
         String url = shardUrls.get(w);
         SolrStream solrStream = new SolrStream(url, paramsLoc);
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/SolrStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/SolrStream.java
index 58accf5..504d6a0 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/SolrStream.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/SolrStream.java
@@ -125,7 +125,7 @@ public class SolrStream extends TupleStream {
     try {
       SolrParams requestParams = loadParams(params);
       if (!distrib) {
-        ((ModifiableSolrParams) requestParams).add("distrib","false");
+        ((ModifiableSolrParams) requestParams).add(CommonParams.DISTRIB, "false");
       }
       tupleStreamParser = constructParser(requestParams);
     } catch (Exception e) {
@@ -153,17 +153,22 @@ public class SolrStream extends TupleStream {
     ModifiableSolrParams solrParams = new ModifiableSolrParams(paramsIn);
     if (params.get("partitionKeys") != null) {
       if (!params.get("partitionKeys").equals("none") && numWorkers > 1) {
-        if (params.getBool(ParallelStream.USE_HASH_QUERY_PARAM, false)) {
+        // turn on ExportWriter partitioning only for /export and only when requested
+        String qt = params.get(CommonParams.QT);
+        if (qt != null && qt.equals("/export")) {
+          solrParams.add(ParallelStream.WORKER_ID_PARAM, String.valueOf(workerID));
+          solrParams.add(ParallelStream.NUM_WORKERS_PARAM, String.valueOf(numWorkers));
+        } else {
           String partitionFilter = getPartitionFilter();
-          solrParams.add("fq", partitionFilter);
+          solrParams.add(CommonParams.FQ, partitionFilter);
         }
       }
-    } else if(numWorkers > 1) {
-        throw new IOException("When numWorkers > 1 partitionKeys must be set. Set partitionKeys=none to send the entire stream to each worker.");
+    } else if (numWorkers > 1) {
+      throw new IOException("When numWorkers > 1 partitionKeys must be set. Set partitionKeys=none to send the entire stream to each worker.");
     }
 
     if (checkpoint > 0) {
-      solrParams.add("fq", "{!frange cost=100 incl=false l="+checkpoint+"}_version_");
+      solrParams.add(CommonParams.FQ, "{!frange cost=100 incl=false l="+checkpoint+"}_version_");
     }
 
     return solrParams;
@@ -278,10 +283,10 @@ public class SolrStream extends TupleStream {
   }
 
   private TupleStreamParser constructParser(SolrParams requestParams) throws IOException, SolrServerException {
-    String p = requestParams.get("qt");
+    String p = requestParams.get(CommonParams.QT);
     if (p != null) {
       ModifiableSolrParams modifiableSolrParams = (ModifiableSolrParams) requestParams;
-      modifiableSolrParams.remove("qt");
+      modifiableSolrParams.remove(CommonParams.QT);
       //performance optimization - remove extra whitespace by default when streaming
       modifiableSolrParams.set("indent", modifiableSolrParams.get("indent", "off"));
     }