You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by jb...@apache.org on 2020/05/22 18:34:51 UTC

[lucene-solr] branch jira/solr-14470 updated: SOLR-14470: Add CubeStream

This is an automated email from the ASF dual-hosted git repository.

jbernste pushed a commit to branch jira/solr-14470
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git


The following commit(s) were added to refs/heads/jira/solr-14470 by this push:
     new 847b73a  SOLR-14470: Add CubeStream
847b73a is described below

commit 847b73aeb500351b993a38e49e85ee927a3dd24c
Author: Joel Bernstein <jb...@apache.org>
AuthorDate: Fri May 22 14:34:28 2020 -0400

    SOLR-14470: Add CubeStream
---
 .../apache/solr/handler/export/ExportWriter.java   |   9 +-
 .../solr/handler/export/StringFieldWriter.java     |   2 +-
 .../java/org/apache/solr/client/solrj/io/Lang.java | 100 ++++++++
 .../solr/client/solrj/io/stream/CubeStream.java    | 285 +++++++++++++++++++++
 .../client/solrj/io/stream/expr/StreamFactory.java |  21 ++
 5 files changed, 412 insertions(+), 5 deletions(-)

diff --git a/solr/core/src/java/org/apache/solr/handler/export/ExportWriter.java b/solr/core/src/java/org/apache/solr/handler/export/ExportWriter.java
index 5c556ca..725c528 100644
--- a/solr/core/src/java/org/apache/solr/handler/export/ExportWriter.java
+++ b/solr/core/src/java/org/apache/solr/handler/export/ExportWriter.java
@@ -153,7 +153,7 @@ public class ExportWriter implements SolrCore.RawWriter, Closeable {
     final TupleEntryWriter entryWriter = new TupleEntryWriter();
 
     public ExportWriterStream(StreamExpression expression, StreamFactory factory) throws IOException {
-
+      streamComparator = parseComp(factory.getDefaultSort());
     }
 
     @Override
@@ -194,8 +194,6 @@ public class ExportWriter implements SolrCore.RawWriter, Closeable {
 
     @Override
     public void open() throws IOException {
-      String sort = (String)context.get(CommonParams.SORT);
-      streamComparator = parseComp(sort);
       docs = (SortDoc[]) context.get(SORT_DOCS_KEY);
       queue = (SortQueue) context.get(SORT_QUEUE_KEY);
       sortDoc = (SortDoc) context.get(SORT_DOC_KEY);
@@ -379,6 +377,7 @@ public class ExportWriter implements SolrCore.RawWriter, Closeable {
     String expr = params.get(StreamParams.EXPR);
     if (expr != null) {
       StreamFactory streamFactory = initialStreamContext.getStreamFactory();
+      streamFactory.withDefaultSort(params.get(CommonParams.SORT));
       try {
         StreamExpression expression = StreamExpressionParser.parse(expr);
         if (streamFactory.isEvaluator(expression)) {
@@ -419,7 +418,9 @@ public class ExportWriter implements SolrCore.RawWriter, Closeable {
   }
 
   private TupleStream createTupleStream() throws IOException {
-    StreamFactory streamFactory = initialStreamContext.getStreamFactory();
+    StreamFactory streamFactory = (StreamFactory)initialStreamContext.getStreamFactory().clone();
+    //Set the sort in the stream factory so it can be used during initialization.
+    streamFactory.withDefaultSort(((String)streamContext.get(CommonParams.SORT)));
     TupleStream tupleStream = streamFactory.constructStream(streamExpression);
     tupleStream.setStreamContext(streamContext);
     return tupleStream;
diff --git a/solr/core/src/java/org/apache/solr/handler/export/StringFieldWriter.java b/solr/core/src/java/org/apache/solr/handler/export/StringFieldWriter.java
index 6d5e34c..b82c365 100644
--- a/solr/core/src/java/org/apache/solr/handler/export/StringFieldWriter.java
+++ b/solr/core/src/java/org/apache/solr/handler/export/StringFieldWriter.java
@@ -86,7 +86,7 @@ class StringFieldWriter extends FieldWriter {
       }
 
       ew.put(this.field, v);
-      
+
     }
     return true;
   }
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/Lang.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/Lang.java
index e2008be..947926b 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/Lang.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/Lang.java
@@ -16,6 +16,13 @@
  */
 package org.apache.solr.client.solrj.io;
 
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.solr.client.solrj.io.comp.ComparatorOrder;
+import org.apache.solr.client.solrj.io.comp.FieldComparator;
+import org.apache.solr.client.solrj.io.comp.MultipleFieldComparator;
+import org.apache.solr.client.solrj.io.comp.StreamComparator;
 import org.apache.solr.client.solrj.io.eval.*;
 import org.apache.solr.client.solrj.io.graph.GatherNodesStream;
 import org.apache.solr.client.solrj.io.graph.ShortestPathStream;
@@ -23,6 +30,11 @@ import org.apache.solr.client.solrj.io.ops.DistinctOperation;
 import org.apache.solr.client.solrj.io.ops.GroupOperation;
 import org.apache.solr.client.solrj.io.ops.ReplaceOperation;
 import org.apache.solr.client.solrj.io.stream.*;
+import org.apache.solr.client.solrj.io.stream.expr.Explanation;
+import org.apache.solr.client.solrj.io.stream.expr.Expressible;
+import org.apache.solr.client.solrj.io.stream.expr.StreamExplanation;
+import org.apache.solr.client.solrj.io.stream.expr.StreamExpression;
+import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionParameter;
 import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
 import org.apache.solr.client.solrj.io.stream.metrics.CountMetric;
 import org.apache.solr.client.solrj.io.stream.metrics.MaxMetric;
@@ -31,6 +43,7 @@ import org.apache.solr.client.solrj.io.stream.metrics.MinMetric;
 import org.apache.solr.client.solrj.io.stream.metrics.PercentileMetric;
 import org.apache.solr.client.solrj.io.stream.metrics.StdMetric;
 import org.apache.solr.client.solrj.io.stream.metrics.SumMetric;
+import org.apache.solr.common.params.CommonParams;
 
 public class Lang {
 
@@ -56,6 +69,8 @@ public class Lang {
         .withFunctionName("group", GroupOperation.class)
         .withFunctionName("reduce", ReducerStream.class)
         .withFunctionName("parallel", ParallelStream.class)
+        .withFunctionName("cube", CubeStream.class)
+        .withFunctionName("input", LocalInputStream.class)
         .withFunctionName("rollup", RollupStream.class)
         .withFunctionName("stats", StatsStream.class)
         .withFunctionName("innerJoin", InnerJoinStream.class)
@@ -364,4 +379,89 @@ public class Lang {
         .withFunctionName("if", IfThenElseEvaluator.class)
         .withFunctionName("convert", ConversionEvaluator.class);
   }
+
+  public static class LocalInputStream extends TupleStream implements Expressible {
+
+    private StreamComparator streamComparator;
+    private String sort;
+
+
+    public LocalInputStream(StreamExpression expression, StreamFactory factory) throws IOException {
+      this.streamComparator = parseComp(factory.getDefaultSort());
+    }
+
+    @Override
+    public void setStreamContext(StreamContext context) {
+      sort = (String)context.get(CommonParams.SORT);
+    }
+
+    @Override
+    public List<TupleStream> children() {
+      return null;
+    }
+
+    private StreamComparator parseComp(String sort) throws IOException {
+
+      String[] sorts = sort.split(",");
+      StreamComparator[] comps = new StreamComparator[sorts.length];
+      for(int i=0; i<sorts.length; i++) {
+        String s = sorts[i];
+
+        String[] spec = s.trim().split("\\s+"); //This should take into account spaces in the sort spec.
+
+        if (spec.length != 2) {
+          throw new IOException("Invalid sort spec:" + s);
+        }
+
+        String fieldName = spec[0].trim();
+        String order = spec[1].trim();
+
+        comps[i] = new FieldComparator(fieldName, order.equalsIgnoreCase("asc") ? ComparatorOrder.ASCENDING : ComparatorOrder.DESCENDING);
+      }
+
+      if(comps.length > 1) {
+        return new MultipleFieldComparator(comps);
+      } else {
+        return comps[0];
+      }
+    }
+
+    @Override
+    public void open() throws IOException {
+      streamComparator = parseComp(sort);
+    }
+
+    @Override
+    public void close() throws IOException {
+
+    }
+
+    @Override
+    public Tuple read() throws IOException {
+      return null;
+    }
+
+    @Override
+    public StreamComparator getStreamSort() {
+      return streamComparator;
+    }
+
+    @Override
+    public StreamExpressionParameter toExpression(StreamFactory factory) throws IOException {
+      StreamExpression expression = new StreamExpression(factory.getFunctionName(this.getClass()));
+      return expression;
+    }
+
+    @Override
+    public Explanation toExplanation(StreamFactory factory) throws IOException {
+      return new StreamExplanation(getStreamNodeId().toString())
+          .withFunctionName("input")
+          .withImplementingClass(this.getClass().getName())
+          .withExpressionType(Explanation.ExpressionType.STREAM_SOURCE)
+          .withExpression("--non-expressible--");
+    }
+  }
+
+
+
 }
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/CubeStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/CubeStream.java
new file mode 100644
index 0000000..bae2076
--- /dev/null
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/CubeStream.java
@@ -0,0 +1,285 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.client.solrj.io.stream;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Locale;
+
+import org.apache.solr.client.solrj.io.Tuple;
+import org.apache.solr.client.solrj.io.comp.FieldComparator;
+import org.apache.solr.client.solrj.io.comp.StreamComparator;
+import org.apache.solr.client.solrj.io.stream.expr.Explanation;
+import org.apache.solr.client.solrj.io.stream.expr.Explanation.ExpressionType;
+import org.apache.solr.client.solrj.io.stream.expr.Expressible;
+import org.apache.solr.client.solrj.io.stream.expr.StreamExplanation;
+import org.apache.solr.client.solrj.io.stream.expr.StreamExpression;
+import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionNamedParameter;
+import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionValue;
+import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
+import org.apache.solr.common.params.CommonParams;
+import org.apache.solr.common.params.ModifiableSolrParams;
+
+import static org.apache.solr.common.params.CommonParams.DISTRIB;
+import static org.apache.solr.common.params.CommonParams.SORT;
+import static org.apache.solr.common.params.CommonParams.Q;
+import static org.apache.solr.common.params.CommonParams.FL;
+
+/**
+ * The ParallelStream decorates a TupleStream implementation and pushes it to N workers for parallel execution.
+ * Workers are chosen from a SolrCloud collection.
+ * Tuples that are streamed back from the workers are ordered by a Comparator.
+ * @since 5.1.0
+ **/
+public class CubeStream extends CloudSolrStream implements Expressible {
+
+  private TupleStream tupleStream;
+  private transient StreamFactory streamFactory;
+  private String sort;
+  private String fl;
+  private String q;
+
+  public CubeStream(String zkHost,
+                    String collection,
+                    TupleStream tupleStream,
+                    StreamComparator comp,
+                    String sortParam,
+                    String flParam,
+                    String qParam) throws IOException {
+    init(zkHost,
+         collection,
+         tupleStream,
+         comp,
+         sortParam,
+         flParam,
+         qParam);
+  }
+
+
+  public CubeStream(String zkHost,
+                    String collection,
+                    String expressionString,
+                    StreamComparator comp,
+                    String sortParam,
+                    String flParam,
+                    String qParam) throws IOException {
+    TupleStream tStream = this.streamFactory.constructStream(expressionString);
+    init(zkHost,
+         collection,
+         tStream,
+         comp,
+         sortParam,
+         flParam,
+         qParam);
+  }
+
+  public void setStreamFactory(StreamFactory streamFactory) {
+    this.streamFactory = streamFactory;
+  }
+
+  public CubeStream(StreamExpression expression, StreamFactory factory) throws IOException {
+    // grab all parameters out
+    String collectionName = factory.getValueOperand(expression, 0);
+    List<StreamExpression> streamExpressions = factory.getExpressionOperandsRepresentingTypes(expression, Expressible.class, TupleStream.class);
+    StreamExpressionNamedParameter sortExpression = factory.getNamedOperand(expression, SORT);
+    StreamExpressionNamedParameter qExpression = factory.getNamedOperand(expression, Q);
+    StreamExpressionNamedParameter flExpression = factory.getNamedOperand(expression, FL);
+
+
+    StreamExpressionNamedParameter zkHostExpression = factory.getNamedOperand(expression, "zkHost");
+
+    // Collection Name
+    if(null == collectionName){
+      throw new IOException(String.format(Locale.ROOT,"invalid expression %s - collectionName expected as first operand",expression));
+    }
+
+
+    // Stream
+    if(1 != streamExpressions.size()){
+      throw new IOException(String.format(Locale.ROOT,"Invalid expression %s - expecting a single stream but found %d",expression, streamExpressions.size()));
+    }
+
+    String sortParam = null;
+
+    // Sort
+    if(null == sortExpression || !(sortExpression.getParameter() instanceof StreamExpressionValue)){
+      throw new IOException(String.format(Locale.ROOT,"Invalid expression %s - expecting single 'sort' parameter but didn't find one",expression));
+    } else {
+      sortParam = ((StreamExpressionValue)sortExpression.getParameter()).getValue();
+    }
+
+    String flParam = null;
+
+    // fl
+    if(null == flExpression || !(flExpression.getParameter() instanceof StreamExpressionValue)){
+      throw new IOException(String.format(Locale.ROOT,"Invalid expression %s - expecting single 'fl' parameter but didn't find one",expression));
+    } else {
+      flParam = ((StreamExpressionValue)flExpression.getParameter()).getValue();
+    }
+
+    String qParam = null;
+
+    // q
+    if(null == qExpression || !(qExpression.getParameter() instanceof StreamExpressionValue)){
+      throw new IOException(String.format(Locale.ROOT,"Invalid expression %s - expecting single 'q' parameter but didn't find one",expression));
+    } else {
+      qParam = ((StreamExpressionValue)qExpression.getParameter()).getValue();
+    }
+
+    // zkHost, optional - if not provided then will look into factory list to get
+    String zkHost = null;
+    if(null == zkHostExpression){
+      zkHost = factory.getCollectionZkHost(collectionName);
+      if(zkHost == null) {
+        zkHost = factory.getDefaultZkHost();
+      }
+    }
+    else if(zkHostExpression.getParameter() instanceof StreamExpressionValue){
+      zkHost = ((StreamExpressionValue)zkHostExpression.getParameter()).getValue();
+    }
+    if(null == zkHost){
+      throw new IOException(String.format(Locale.ROOT,"invalid expression %s - zkHost not found for collection '%s'",expression,collectionName));
+    }
+
+    // We've got all the required items
+    StreamFactory localFactory = (StreamFactory)factory.clone();
+    localFactory.withDefaultSort(sortParam);
+    TupleStream stream = localFactory.constructStream(streamExpressions.get(0));
+    StreamComparator comp = localFactory.constructComparator(((StreamExpressionValue)sortExpression.getParameter()).getValue(), FieldComparator.class);
+    streamFactory = factory;
+    init(zkHost,collectionName,stream, comp, sortParam, flParam, qParam);
+  }
+
+  private void init(String zkHost,
+                    String collection,
+                    TupleStream tupleStream,
+                    StreamComparator comp,
+                    String sortParam,
+                    String flParam,
+                    String qParam) throws IOException{
+    this.zkHost = zkHost;
+    this.collection = collection;
+    this.comp = comp;
+    this.tupleStream = tupleStream;
+    this.fl = flParam;
+    this.q = qParam;
+    this.sort = sortParam;
+
+    // requires Expressible stream and comparator
+    if(! (tupleStream instanceof Expressible)){
+      throw new IOException("Unable to create ParallelStream with a non-expressible TupleStream.");
+    }
+  }
+
+  @Override
+  public StreamExpression toExpression(StreamFactory factory) throws IOException{
+    return toExpression(factory, true);
+  }
+
+  private StreamExpression toExpression(StreamFactory factory, boolean includeStreams) throws IOException {
+    // function name
+    StreamExpression expression = new StreamExpression(factory.getFunctionName(this.getClass()));
+
+    // collection
+    expression.addParameter(collection);
+
+
+    if(includeStreams){
+      if(tupleStream instanceof Expressible){
+        expression.addParameter(((Expressible)tupleStream).toExpression(factory));
+      }
+      else{
+        throw new IOException("This ParallelStream contains a non-expressible TupleStream - it cannot be converted to an expression");
+      }
+    }
+    else{
+      expression.addParameter("<stream>");
+    }
+
+    // sort
+    expression.addParameter(new StreamExpressionNamedParameter(SORT,comp.toExpression(factory)));
+
+    // zkHost
+    expression.addParameter(new StreamExpressionNamedParameter("zkHost", zkHost));
+
+    return expression;
+  }
+
+  @Override
+  public Explanation toExplanation(StreamFactory factory) throws IOException {
+
+    StreamExplanation explanation = new StreamExplanation(getStreamNodeId().toString());
+
+    explanation.setFunctionName(factory.getFunctionName(this.getClass()));
+    explanation.setImplementingClass(this.getClass().getName());
+    explanation.setExpressionType(ExpressionType.STREAM_DECORATOR);
+    explanation.setExpression(toExpression(factory, false).toString());
+
+
+    return explanation;
+  }
+
+  public List<TupleStream> children() {
+    List l = new ArrayList();
+    l.add(tupleStream);
+    return l;
+  }
+
+  public Tuple read() throws IOException {
+    Tuple tuple = _read();
+
+    if(tuple.EOF) {
+      return Tuple.EOF();
+    }
+
+    return tuple;
+  }
+
+  public void setStreamContext(StreamContext streamContext) {
+    this.streamContext = streamContext;
+    if(streamFactory == null) {
+      this.streamFactory = streamContext.getStreamFactory();
+    }
+    this.tupleStream.setStreamContext(streamContext);
+  }
+
+  protected void constructStreams() throws IOException {
+    try {
+      Object pushStream = ((Expressible) tupleStream).toExpression(streamFactory);
+
+      List<String> shardUrls = getShards(this.zkHost, this.collection, this.streamContext);
+
+      for(int w=0; w<shardUrls.size(); w++) {
+        ModifiableSolrParams paramsLoc = new ModifiableSolrParams();
+        paramsLoc.set(DISTRIB,"false"); // We are the aggregator.
+        paramsLoc.set("expr", pushStream.toString());
+        paramsLoc.set("qt","/export");
+        paramsLoc.set("fl", fl);
+        paramsLoc.set("sort", sort);
+        paramsLoc.set("q", q);
+        String url = shardUrls.get(w);
+        SolrStream solrStream = new SolrStream(url, paramsLoc);
+        solrStream.setStreamContext(streamContext);
+        solrStreams.add(solrStream);
+      }
+
+    } catch (Exception e) {
+      throw new IOException(e);
+    }
+  }
+}
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/expr/StreamFactory.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/expr/StreamFactory.java
index d32b097..883c57b 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/expr/StreamFactory.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/expr/StreamFactory.java
@@ -49,11 +49,17 @@ public class StreamFactory implements Serializable {
   private transient HashMap<String,Supplier<Class<? extends Expressible>>> functionNames;
   private transient String defaultZkHost;
   private transient String defaultCollection;
+  private transient String defaultSort;
   
   public StreamFactory(){
     collectionZkHosts = new HashMap<>();
     functionNames = new HashMap<>();
   }
+
+  public StreamFactory(HashMap<String,Supplier<Class<? extends Expressible>>> functionNames) {
+    this.functionNames = functionNames;
+    collectionZkHosts = new HashMap<>();
+  }
   
   public StreamFactory withCollectionZkHost(String collectionName, String zkHost){
     this.collectionZkHosts.put(collectionName, zkHost);
@@ -70,6 +76,21 @@ public class StreamFactory implements Serializable {
     return this;
   }
 
+  public Object clone() {
+    //Shallow copy
+    StreamFactory clone = new StreamFactory(functionNames);
+    return clone.withCollectionZkHost(defaultCollection, defaultZkHost).withDefaultSort(defaultSort);
+  }
+
+  public StreamFactory withDefaultSort(String sort) {
+    this.defaultSort = sort;
+    return this;
+  }
+
+  public String getDefaultSort() {
+    return this.defaultSort;
+  }
+
   public String getDefaultZkHost() {
     return this.defaultZkHost;
   }