You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by jb...@apache.org on 2020/06/29 00:48:53 UTC

[lucene-solr] branch master updated: SOLR-14481: Add drill Streaming Expression

This is an automated email from the ASF dual-hosted git repository.

jbernste pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git


The following commit(s) were added to refs/heads/master by this push:
     new 7bb9926  SOLR-14481: Add drill Streaming Expression
7bb9926 is described below

commit 7bb9926ef282321361a7f90a6f577a72121c6582
Author: Joel Bernstein <jb...@apache.org>
AuthorDate: Sun Jun 28 20:41:28 2020 -0400

    SOLR-14481: Add drill Streaming Expression
---
 .../java/org/apache/solr/client/solrj/io/Lang.java | 100 +++++++-
 .../solr/client/solrj/io/stream/DrillStream.java   | 280 +++++++++++++++++++++
 .../org/apache/solr/client/solrj/io/TestLang.java  |   2 +-
 .../solrj/io/stream/StreamExpressionTest.java      |  73 +++++-
 4 files changed, 450 insertions(+), 5 deletions(-)

diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/Lang.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/Lang.java
index e2008be..3ea5492 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/Lang.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/Lang.java
@@ -31,7 +31,18 @@ import org.apache.solr.client.solrj.io.stream.metrics.MinMetric;
 import org.apache.solr.client.solrj.io.stream.metrics.PercentileMetric;
 import org.apache.solr.client.solrj.io.stream.metrics.StdMetric;
 import org.apache.solr.client.solrj.io.stream.metrics.SumMetric;
-
+import org.apache.solr.client.solrj.io.comp.ComparatorOrder;
+import org.apache.solr.client.solrj.io.comp.FieldComparator;
+import org.apache.solr.client.solrj.io.comp.MultipleFieldComparator;
+import org.apache.solr.client.solrj.io.comp.StreamComparator;
+import org.apache.solr.client.solrj.io.stream.expr.Explanation;
+import org.apache.solr.client.solrj.io.stream.expr.Expressible;
+import org.apache.solr.client.solrj.io.stream.expr.StreamExplanation;
+import org.apache.solr.client.solrj.io.stream.expr.StreamExpression;
+import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionParameter;
+import org.apache.solr.common.params.CommonParams;
+import java.io.IOException;
+import java.util.List;
 public class Lang {
 
   public static void register(StreamFactory streamFactory) {
@@ -285,6 +296,8 @@ public class Lang {
         .withFunctionName("pairSort", PairSortEvaluator.class)
         .withFunctionName("recip", RecipEvaluator.class)
         .withFunctionName("pivot", PivotEvaluator.class)
+        .withFunctionName("drill", DrillStream.class)
+        .withFunctionName("input", LocalInputStream.class)
         .withFunctionName("ltrim", LeftShiftEvaluator.class)
         .withFunctionName("rtrim", RightShiftEvaluator.class)
         .withFunctionName("repeat", RepeatEvaluator.class)
@@ -364,4 +377,89 @@ public class Lang {
         .withFunctionName("if", IfThenElseEvaluator.class)
         .withFunctionName("convert", ConversionEvaluator.class);
   }
+
+
+  public static class LocalInputStream extends TupleStream implements Expressible {
+
+    private StreamComparator streamComparator;
+    private String sort;
+
+
+    public LocalInputStream(StreamExpression expression, StreamFactory factory) throws IOException {
+      this.streamComparator = parseComp(factory.getDefaultSort());
+    }
+
+    @Override
+    public void setStreamContext(StreamContext context) {
+      sort = (String)context.get(CommonParams.SORT);
+    }
+
+    @Override
+    public List<TupleStream> children() {
+      return null;
+    }
+
+    private StreamComparator parseComp(String sort) throws IOException {
+
+      String[] sorts = sort.split(",");
+      StreamComparator[] comps = new StreamComparator[sorts.length];
+      for(int i=0; i<sorts.length; i++) {
+        String s = sorts[i];
+
+        String[] spec = s.trim().split("\\s+"); //This should take into account spaces in the sort spec.
+
+        if (spec.length != 2) {
+          throw new IOException("Invalid sort spec:" + s);
+        }
+
+        String fieldName = spec[0].trim();
+        String order = spec[1].trim();
+
+        comps[i] = new FieldComparator(fieldName, order.equalsIgnoreCase("asc") ? ComparatorOrder.ASCENDING : ComparatorOrder.DESCENDING);
+      }
+
+      if(comps.length > 1) {
+        return new MultipleFieldComparator(comps);
+      } else {
+        return comps[0];
+      }
+    }
+
+    @Override
+    public void open() throws IOException {
+      streamComparator = parseComp(sort);
+    }
+
+    @Override
+    public void close() throws IOException {
+
+    }
+
+    @Override
+    public Tuple read() throws IOException {
+      return null;
+    }
+
+    @Override
+    public StreamComparator getStreamSort() {
+      return streamComparator;
+    }
+
+    @Override
+    public StreamExpressionParameter toExpression(StreamFactory factory) throws IOException {
+      StreamExpression expression = new StreamExpression(factory.getFunctionName(this.getClass()));
+      return expression;
+    }
+
+    @Override
+    public Explanation toExplanation(StreamFactory factory) throws IOException {
+      return new StreamExplanation(getStreamNodeId().toString())
+          .withFunctionName("input")
+          .withImplementingClass(this.getClass().getName())
+          .withExpressionType(Explanation.ExpressionType.STREAM_SOURCE)
+          .withExpression("--non-expressible--");
+    }
+  }
+
+
 }
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/DrillStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/DrillStream.java
new file mode 100644
index 0000000..c599b75
--- /dev/null
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/DrillStream.java
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.client.solrj.io.stream;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Locale;
+
+import org.apache.solr.client.solrj.io.Tuple;
+import org.apache.solr.client.solrj.io.comp.FieldComparator;
+import org.apache.solr.client.solrj.io.comp.StreamComparator;
+import org.apache.solr.client.solrj.io.stream.expr.Explanation;
+import org.apache.solr.client.solrj.io.stream.expr.Explanation.ExpressionType;
+import org.apache.solr.client.solrj.io.stream.expr.Expressible;
+import org.apache.solr.client.solrj.io.stream.expr.StreamExplanation;
+import org.apache.solr.client.solrj.io.stream.expr.StreamExpression;
+import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionNamedParameter;
+import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionValue;
+import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
+import org.apache.solr.common.params.ModifiableSolrParams;
+
+import static org.apache.solr.common.params.CommonParams.DISTRIB;
+import static org.apache.solr.common.params.CommonParams.SORT;
+import static org.apache.solr.common.params.CommonParams.Q;
+import static org.apache.solr.common.params.CommonParams.FL;
+
+
+public class DrillStream extends CloudSolrStream implements Expressible {
+
+  private TupleStream tupleStream;
+  private transient StreamFactory streamFactory;
+  private String sort;
+  private String fl;
+  private String q;
+
+  public DrillStream(String zkHost,
+                    String collection,
+                    TupleStream tupleStream,
+                    StreamComparator comp,
+                    String sortParam,
+                    String flParam,
+                    String qParam) throws IOException {
+    init(zkHost,
+        collection,
+        tupleStream,
+        comp,
+        sortParam,
+        flParam,
+        qParam);
+  }
+
+  public DrillStream(String zkHost,
+                    String collection,
+                    String expressionString,
+                    StreamComparator comp,
+                    String sortParam,
+                    String flParam,
+                    String qParam) throws IOException {
+    TupleStream tStream = this.streamFactory.constructStream(expressionString);
+    init(zkHost,
+        collection,
+        tStream,
+        comp,
+        sortParam,
+        flParam,
+        qParam);
+  }
+
+  public void setStreamFactory(StreamFactory streamFactory) {
+    this.streamFactory = streamFactory;
+  }
+
+  public DrillStream(StreamExpression expression, StreamFactory factory) throws IOException {
+    // grab all parameters out
+    String collectionName = factory.getValueOperand(expression, 0);
+    List<StreamExpression> streamExpressions = factory.getExpressionOperandsRepresentingTypes(expression, Expressible.class, TupleStream.class);
+    StreamExpressionNamedParameter sortExpression = factory.getNamedOperand(expression, SORT);
+    StreamExpressionNamedParameter qExpression = factory.getNamedOperand(expression, Q);
+    StreamExpressionNamedParameter flExpression = factory.getNamedOperand(expression, FL);
+
+
+    StreamExpressionNamedParameter zkHostExpression = factory.getNamedOperand(expression, "zkHost");
+
+    // Collection Name
+    if(null == collectionName){
+      throw new IOException(String.format(Locale.ROOT,"invalid expression %s - collectionName expected as first operand",expression));
+    }
+
+
+    // Stream
+    if(1 != streamExpressions.size()){
+      throw new IOException(String.format(Locale.ROOT,"Invalid expression %s - expecting a single stream but found %d",expression, streamExpressions.size()));
+    }
+
+    String sortParam = null;
+
+    // Sort
+    if(null == sortExpression || !(sortExpression.getParameter() instanceof StreamExpressionValue)){
+      throw new IOException(String.format(Locale.ROOT,"Invalid expression %s - expecting single 'sort' parameter but didn't find one",expression));
+    } else {
+      sortParam = ((StreamExpressionValue)sortExpression.getParameter()).getValue();
+    }
+
+    String flParam = null;
+
+    // fl
+    if(null == flExpression || !(flExpression.getParameter() instanceof StreamExpressionValue)){
+      throw new IOException(String.format(Locale.ROOT,"Invalid expression %s - expecting single 'fl' parameter but didn't find one",expression));
+    } else {
+      flParam = ((StreamExpressionValue)flExpression.getParameter()).getValue();
+    }
+
+    String qParam = null;
+
+    // q
+    if(null == qExpression || !(qExpression.getParameter() instanceof StreamExpressionValue)){
+      throw new IOException(String.format(Locale.ROOT,"Invalid expression %s - expecting single 'q' parameter but didn't find one",expression));
+    } else {
+      qParam = ((StreamExpressionValue)qExpression.getParameter()).getValue();
+    }
+
+    // zkHost, optional - if not provided then will look into factory list to get
+    String zkHost = null;
+    if(null == zkHostExpression){
+      zkHost = factory.getCollectionZkHost(collectionName);
+      if(zkHost == null) {
+        zkHost = factory.getDefaultZkHost();
+      }
+    }
+    else if(zkHostExpression.getParameter() instanceof StreamExpressionValue){
+      zkHost = ((StreamExpressionValue)zkHostExpression.getParameter()).getValue();
+    }
+    if(null == zkHost){
+      throw new IOException(String.format(Locale.ROOT,"invalid expression %s - zkHost not found for collection '%s'",expression,collectionName));
+    }
+
+    // We've got all the required items
+    StreamFactory localFactory = (StreamFactory)factory.clone();
+    localFactory.withDefaultSort(sortParam);
+    TupleStream stream = localFactory.constructStream(streamExpressions.get(0));
+    StreamComparator comp = localFactory.constructComparator(((StreamExpressionValue)sortExpression.getParameter()).getValue(), FieldComparator.class);
+    streamFactory = factory;
+    init(zkHost,collectionName,stream, comp, sortParam, flParam, qParam);
+  }
+
+  private void init(String zkHost,
+                    String collection,
+                    TupleStream tupleStream,
+                    StreamComparator comp,
+                    String sortParam,
+                    String flParam,
+                    String qParam) throws IOException{
+    this.zkHost = zkHost;
+    this.collection = collection;
+    this.comp = comp;
+    this.tupleStream = tupleStream;
+    this.fl = flParam;
+    this.q = qParam;
+    this.sort = sortParam;
+
+    // requires Expressible stream and comparator
+    if(! (tupleStream instanceof Expressible)){
+      throw new IOException("Unable to create DrillStream with a non-expressible TupleStream.");
+    }
+  }
+
+  @Override
+  public StreamExpression toExpression(StreamFactory factory) throws IOException{
+    return toExpression(factory, true);
+  }
+
+  private StreamExpression toExpression(StreamFactory factory, boolean includeStreams) throws IOException {
+    // function name
+    StreamExpression expression = new StreamExpression(factory.getFunctionName(this.getClass()));
+
+    // collection
+    expression.addParameter(collection);
+
+
+    if(includeStreams){
+      if(tupleStream instanceof Expressible){
+        expression.addParameter(((Expressible)tupleStream).toExpression(factory));
+      }
+      else{
+        throw new IOException("This DrillStream contains a non-expressible TupleStream - it cannot be converted to an expression");
+      }
+    }
+    else{
+      expression.addParameter("<stream>");
+    }
+
+    // sort
+    expression.addParameter(new StreamExpressionNamedParameter(SORT,comp.toExpression(factory)));
+
+    // zkHost
+    expression.addParameter(new StreamExpressionNamedParameter("zkHost", zkHost));
+
+    return expression;
+  }
+
+  @Override
+  public Explanation toExplanation(StreamFactory factory) throws IOException {
+
+    StreamExplanation explanation = new StreamExplanation(getStreamNodeId().toString());
+
+    explanation.setFunctionName(factory.getFunctionName(this.getClass()));
+    explanation.setImplementingClass(this.getClass().getName());
+    explanation.setExpressionType(ExpressionType.STREAM_DECORATOR);
+    explanation.setExpression(toExpression(factory, false).toString());
+
+
+    return explanation;
+  }
+
+  public List<TupleStream> children() {
+    List l = new ArrayList();
+    l.add(tupleStream);
+    return l;
+  }
+
+  public Tuple read() throws IOException {
+    Tuple tuple = _read();
+
+    if(tuple.EOF) {
+      return Tuple.EOF();
+    }
+
+    return tuple;
+  }
+
+  public void setStreamContext(StreamContext streamContext) {
+    this.streamContext = streamContext;
+    if(streamFactory == null) {
+      this.streamFactory = streamContext.getStreamFactory();
+    }
+    this.tupleStream.setStreamContext(streamContext);
+  }
+
+  protected void constructStreams() throws IOException {
+
+    try {
+
+      Object pushStream = ((Expressible) tupleStream).toExpression(streamFactory);
+
+      List<String> shardUrls = getShards(this.zkHost, this.collection, this.streamContext);
+
+      for(int w=0; w<shardUrls.size(); w++) {
+        ModifiableSolrParams paramsLoc = new ModifiableSolrParams();
+        paramsLoc.set(DISTRIB,"false"); // We are the aggregator.
+        paramsLoc.set("expr", pushStream.toString());
+        paramsLoc.set("qt","/export");
+        paramsLoc.set("fl", fl);
+        paramsLoc.set("sort", sort);
+        paramsLoc.set("q", q);
+        String url = shardUrls.get(w);
+        SolrStream solrStream = new SolrStream(url, paramsLoc);
+        solrStream.setStreamContext(streamContext);
+        solrStreams.add(solrStream);
+      }
+
+    } catch (Exception e) {
+      throw new IOException(e);
+    }
+  }
+}
\ No newline at end of file
diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/io/TestLang.java b/solr/solrj/src/test/org/apache/solr/client/solrj/io/TestLang.java
index 160b108..61cc90d 100644
--- a/solr/solrj/src/test/org/apache/solr/client/solrj/io/TestLang.java
+++ b/solr/solrj/src/test/org/apache/solr/client/solrj/io/TestLang.java
@@ -85,7 +85,7 @@ public class TestLang extends SolrTestCase {
       "getSupportPoints", "pairSort", "log10", "plist", "recip", "pivot", "ltrim", "rtrim", "export",
       "zplot", "natural", "repeat", "movingMAD", "hashRollup", "noop", "var", "stddev", "recNum", "isNull",
       "notNull", "matches", "projectToBorder", "double", "long", "parseCSV", "parseTSV", "dateTime",
-       "split", "upper", "trim", "lower", "trunc", "cosine", "dbscan", "per", "std"};
+       "split", "upper", "trim", "lower", "trunc", "cosine", "dbscan", "per", "std", "drill", "input"};
 
   @Test
   public void testLang() {
diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java
index fa72020..7d8a062 100644
--- a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java
+++ b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java
@@ -1137,6 +1137,75 @@ public class StreamExpressionTest extends SolrCloudTestCase {
 
 
   @Test
+  public void testDrillStream() throws Exception {
+
+    new UpdateRequest()
+        .add(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "1")
+        .add(id, "2", "a_s", "hello0", "a_i", "2", "a_f", "2")
+        .add(id, "3", "a_s", "hello3", "a_i", "3", "a_f", "3")
+        .add(id, "4", "a_s", "hello4", "a_i", "4", "a_f", "4")
+        .add(id, "1", "a_s", "hello0", "a_i", "1", "a_f", "5")
+        .add(id, "5", "a_s", "hello3", "a_i", "10", "a_f", "6")
+        .add(id, "6", "a_s", "hello4", "a_i", "11", "a_f", "7")
+        .add(id, "7", "a_s", "hello3", "a_i", "12", "a_f", "8")
+        .add(id, "8", "a_s", "hello3", "a_i", "13", "a_f", "9")
+        .add(id, "9", "a_s", "hello0", "a_i", "14", "a_f", "10")
+        .commit(cluster.getSolrClient(), COLLECTIONORALIAS);
+
+    List<Tuple> tuples;
+
+    ModifiableSolrParams paramsLoc = new ModifiableSolrParams();
+    String expr = "rollup(select(drill("
+        + "                            collection1, "
+        + "                            q=\"*:*\", "
+        + "                            fl=\"a_s, a_f\", "
+        + "                            sort=\"a_s desc\", "
+        + "                            rollup(input(), over=\"a_s\", count(*), sum(a_f)))," +
+        "                        a_s, count(*) as cnt, sum(a_f) as saf)," +
+        "                  over=\"a_s\"," +
+        "                  sum(cnt), sum(saf)"
+        + ")";
+    paramsLoc.set("expr", expr);
+    paramsLoc.set("qt", "/stream");
+
+    String url = cluster.getJettySolrRunners().get(0).getBaseUrl().toString()+"/"+COLLECTIONORALIAS;
+    TupleStream solrStream = new SolrStream(url, paramsLoc);
+
+    StreamContext context = new StreamContext();
+    solrStream.setStreamContext(context);
+    tuples = getTuples(solrStream);
+
+    Tuple tuple = tuples.get(0);
+    String bucket = tuple.getString("a_s");
+
+    Double count = tuple.getDouble("sum(cnt)");
+    Double saf = tuple.getDouble("sum(saf)");
+
+    assertTrue(bucket.equals("hello4"));
+    assertEquals(count.doubleValue(), 2, 0);
+    assertEquals(saf.doubleValue(), 11, 0);
+
+    tuple = tuples.get(1);
+    bucket = tuple.getString("a_s");
+    count = tuple.getDouble("sum(cnt)");
+    saf = tuple.getDouble("sum(saf)");
+
+    assertTrue(bucket.equals("hello3"));
+    assertEquals(count.doubleValue(), 4, 0);
+    assertEquals(saf.doubleValue(), 26, 0);
+
+    tuple = tuples.get(2);
+    bucket = tuple.getString("a_s");
+    count = tuple.getDouble("sum(cnt)");
+    saf = tuple.getDouble("sum(saf)");
+
+    assertTrue(bucket.equals("hello0"));
+    assertTrue(count.doubleValue() == 4);
+    assertEquals(saf.doubleValue(), 18, 0);
+
+  }
+
+    @Test
   public void testFacetStream() throws Exception {
 
     new UpdateRequest()
@@ -1275,9 +1344,7 @@ public class StreamExpressionTest extends SolrCloudTestCase {
     perf = tuple.getDouble("per(a_f,50)");
 
 
-    System.out.println("STD and Per:"+stdi+":"+stdf+":"+peri+":"+perf);
-//STD and Per:4.509249752822894:2.6457513110645907:11.0:7.0
-    //assert(false);
+
 
     assertTrue(bucket.equals("hello3"));
     assertTrue(sumi.doubleValue() == 38.0D);