You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by jb...@apache.org on 2018/12/27 20:03:18 UTC

lucene-solr:branch_7x: SOLR-13088: Add zplot Stream Evaluator to plot math expressions in Apache Zeppelin

Repository: lucene-solr
Updated Branches:
  refs/heads/branch_7x bcf1a4eaf -> c236ad8a1


SOLR-13088: Add zplot Stream Evaluator to plot math expressions in Apache Zeppelin


Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/c236ad8a
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/c236ad8a
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/c236ad8a

Branch: refs/heads/branch_7x
Commit: c236ad8a18a6d48a5d1be321fe22865052929064
Parents: bcf1a4e
Author: Joel Bernstein <jb...@apache.org>
Authored: Thu Dec 27 14:42:03 2018 -0500
Committer: Joel Bernstein <jb...@apache.org>
Committed: Thu Dec 27 14:58:41 2018 -0500

----------------------------------------------------------------------
 .../org/apache/solr/client/solrj/io/Lang.java   |   4 +-
 .../client/solrj/io/stream/ZplotStream.java     | 208 +++++++++++++++++++
 .../apache/solr/client/solrj/io/TestLang.java   |   3 +-
 .../solrj/io/stream/MathExpressionTest.java     |  77 +++++++
 4 files changed, 290 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/c236ad8a/solr/solrj/src/java/org/apache/solr/client/solrj/io/Lang.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/Lang.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/Lang.java
index 050fa7e..a1a796d 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/Lang.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/Lang.java
@@ -92,8 +92,10 @@ public class Lang {
         .withFunctionName("tuple", TupStream.class)
         .withFunctionName("sql", SqlStream.class)
         .withFunctionName("plist", ParallelListStream.class)
+        .withFunctionName("zplot", ZplotStream.class)
 
-            // metrics
+
+        // metrics
         .withFunctionName("min", MinMetric.class)
         .withFunctionName("max", MaxMetric.class)
         .withFunctionName("avg", MeanMetric.class)

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/c236ad8a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ZplotStream.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ZplotStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ZplotStream.java
new file mode 100644
index 0000000..c5280dc
--- /dev/null
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ZplotStream.java
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.client.solrj.io.stream;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.solr.client.solrj.io.Tuple;
+import org.apache.solr.client.solrj.io.comp.StreamComparator;
+import org.apache.solr.client.solrj.io.eval.StreamEvaluator;
+import org.apache.solr.client.solrj.io.stream.expr.Explanation;
+import org.apache.solr.client.solrj.io.stream.expr.Explanation.ExpressionType;
+import org.apache.solr.client.solrj.io.stream.expr.Expressible;
+import org.apache.solr.client.solrj.io.stream.expr.StreamExplanation;
+import org.apache.solr.client.solrj.io.stream.expr.StreamExpression;
+import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionNamedParameter;
+import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionParameter;
+import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionValue;
+import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
+
+public class ZplotStream extends TupleStream implements Expressible {
+
+  private static final long serialVersionUID = 1;
+  private StreamContext streamContext;
+  private Map letParams = new LinkedHashMap();
+  private Iterator<Tuple> out;
+
+  public ZplotStream(StreamExpression expression, StreamFactory factory) throws IOException {
+
+    List<StreamExpressionNamedParameter> namedParams = factory.getNamedOperands(expression);
+    //Get all the named params
+
+    for(StreamExpressionParameter np : namedParams) {
+      String name = ((StreamExpressionNamedParameter)np).getName();
+      StreamExpressionParameter param = ((StreamExpressionNamedParameter)np).getParameter();
+      if(param instanceof StreamExpressionValue) {
+        String paramValue = ((StreamExpressionValue) param).getValue();
+        letParams.put(name, factory.constructPrimitiveObject(paramValue));
+      } else if(factory.isEvaluator((StreamExpression)param)) {
+        StreamEvaluator evaluator = factory.constructEvaluator((StreamExpression) param);
+        letParams.put(name, evaluator);
+      }
+    }
+  }
+
+  @Override
+  public StreamExpression toExpression(StreamFactory factory) throws IOException{
+    return toExpression(factory, true);
+  }
+
+  private StreamExpression toExpression(StreamFactory factory, boolean includeStreams) throws IOException {
+    // function name
+    StreamExpression expression = new StreamExpression(factory.getFunctionName(this.getClass()));
+
+    return expression;
+  }
+
+  @Override
+  public Explanation toExplanation(StreamFactory factory) throws IOException {
+
+    StreamExplanation explanation = new StreamExplanation(getStreamNodeId().toString());
+    explanation.setFunctionName(factory.getFunctionName(this.getClass()));
+    explanation.setImplementingClass(this.getClass().getName());
+    explanation.setExpressionType(ExpressionType.STREAM_DECORATOR);
+    explanation.setExpression(toExpression(factory, false).toString());
+
+    return explanation;
+  }
+
+  public void setStreamContext(StreamContext context) {
+    this.streamContext = context;
+  }
+
+  public List<TupleStream> children() {
+    List<TupleStream> l =  new ArrayList<TupleStream>();
+    return l;
+  }
+
+  public Tuple read() throws IOException {
+    if(out.hasNext()) {
+      return out.next();
+    } else {
+      Map m = new HashMap();
+      m.put("EOF", true);
+      Tuple t = new Tuple(m);
+      return t;
+    }
+  }
+
+  public void close() throws IOException {
+  }
+
+  public void open() throws IOException {
+    Map<String, Object> lets = streamContext.getLets();
+    Set<Map.Entry<String, Object>> entries = letParams.entrySet();
+    Map<String, Object> evaluated = new HashMap();
+
+    //Load up the StreamContext with the data created by the letParams.
+    int numTuples = -1;
+    int columns = 0;
+    boolean table = false;
+    for(Map.Entry<String, Object> entry : entries) {
+      ++columns;
+
+      String name = entry.getKey();
+      if(name.equals("table")) {
+        table = true;
+      }
+
+      Object o = entry.getValue();
+      if(o instanceof StreamEvaluator) {
+        Tuple eTuple = new Tuple(lets);
+        StreamEvaluator evaluator = (StreamEvaluator)o;
+        evaluator.setStreamContext(streamContext);
+        Object eo = evaluator.evaluate(eTuple);
+        if(eo instanceof List) {
+          List l = (List)eo;
+          if(numTuples == -1) {
+            numTuples = l.size();
+          } else {
+            if(l.size() != numTuples) {
+              throw new IOException("All lists provided to the zplot function must be the same length.");
+            }
+          }
+          evaluated.put(name, l);
+        } else if (eo instanceof Tuple) {
+          evaluated.put(name, eo);
+        }
+      } else {
+        Object eval = lets.get(o);
+        if(eval instanceof List) {
+          List l = (List)eval;
+          if(numTuples == -1) {
+            numTuples = l.size();
+          } else {
+            if(l.size() != numTuples) {
+              throw new IOException("All lists provided to the zplot function must be the same length.");
+            }
+          }
+          evaluated.put(name, l);
+        } else if(eval instanceof Tuple) {
+          evaluated.put(name, eval);
+        }
+      }
+    }
+
+    if(columns > 1 && table) {
+      throw new IOException("If the table parameter is set there can only be one parameter.");
+    }
+    //Load the values into tuples
+
+    List<Tuple> outTuples = new ArrayList();
+    if(!table) {
+      //Handle the vectors
+      for (int i = 0; i < numTuples; i++) {
+        Tuple tuple = new Tuple(new HashMap());
+        for (String key : evaluated.keySet()) {
+          List l = (List) evaluated.get(key);
+          tuple.put(key, l.get(i));
+        }
+
+        outTuples.add(tuple);
+      }
+    } else {
+      //Handle the Tuple and List of Tuples
+      Object o = evaluated.get("table");
+      if(o instanceof List) {
+        List<Tuple> tuples = (List<Tuple>)o;
+        outTuples.addAll(tuples);
+      } else if(o instanceof Tuple) {
+        outTuples.add((Tuple)o);
+      }
+    }
+
+    this.out = outTuples.iterator();
+  }
+
+  /** Return the stream sort - ie, the order in which records are returned */
+  public StreamComparator getStreamSort(){
+    return null;
+  }
+
+  public int getCost() {
+    return 0;
+  }
+
+
+}

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/c236ad8a/solr/solrj/src/test/org/apache/solr/client/solrj/io/TestLang.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/io/TestLang.java b/solr/solrj/src/test/org/apache/solr/client/solrj/io/TestLang.java
index 3b238c2..b5b7317 100644
--- a/solr/solrj/src/test/org/apache/solr/client/solrj/io/TestLang.java
+++ b/solr/solrj/src/test/org/apache/solr/client/solrj/io/TestLang.java
@@ -73,7 +73,8 @@ public class TestLang extends LuceneTestCase {
       "outliers", "stream", "getCache", "putCache", "listCache", "removeCache", "zscores", "latlonVectors",
       "convexHull", "getVertices", "getBaryCenter", "getArea", "getBoundarySize","oscillate",
       "getAmplitude", "getPhase", "getAngularFrequency", "enclosingDisk", "getCenter", "getRadius",
-      "getSupportPoints", "pairSort", "log10", "plist", "recip", "pivot", "ltrim", "rtrim", "export"};
+      "getSupportPoints", "pairSort", "log10", "plist", "recip", "pivot", "ltrim", "rtrim", "export",
+      "zplot"};
 
   @Test
   public void testLang() {

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/c236ad8a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/MathExpressionTest.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/MathExpressionTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/MathExpressionTest.java
index 8ac184a..8e973d2 100644
--- a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/MathExpressionTest.java
+++ b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/MathExpressionTest.java
@@ -1356,6 +1356,83 @@ public class MathExpressionTest extends SolrCloudTestCase {
     assertTrue(tuples.get(0).getLong("i")== 2);
   }
 
+
+  @Test
+  public void testZplot() throws Exception {
+    String cexpr = "let(c=tuple(a=add(1,2), b=add(2,3))," +
+        "               zplot(table=c))";
+
+    ModifiableSolrParams paramsLoc = new ModifiableSolrParams();
+    paramsLoc.set("expr", cexpr);
+    paramsLoc.set("qt", "/stream");
+    String url = cluster.getJettySolrRunners().get(0).getBaseUrl().toString()+"/"+COLLECTIONORALIAS;
+    TupleStream solrStream = new SolrStream(url, paramsLoc);
+    StreamContext context = new StreamContext();
+    solrStream.setStreamContext(context);
+    List<Tuple> tuples = getTuples(solrStream);
+    assertTrue(tuples.size() == 1);
+    Tuple out = tuples.get(0);
+
+    assertEquals(out.getDouble("a").doubleValue(), 3.0, 0.0);
+    assertEquals(out.getDouble("b").doubleValue(), 5.0, 0.0);
+
+    cexpr = "let(c=list(tuple(a=add(1,2), b=add(2,3)), tuple(a=add(1,3), b=add(2,4)))," +
+        "        zplot(table=c))";
+
+    paramsLoc = new ModifiableSolrParams();
+    paramsLoc.set("expr", cexpr);
+    paramsLoc.set("qt", "/stream");
+    solrStream = new SolrStream(url, paramsLoc);
+    context = new StreamContext();
+    solrStream.setStreamContext(context);
+    tuples = getTuples(solrStream);
+    assertTrue(tuples.size() == 2);
+    out = tuples.get(0);
+
+    assertEquals(out.getDouble("a").doubleValue(), 3.0, 0.0);
+    assertEquals(out.getDouble("b").doubleValue(), 5.0, 0.0);
+
+    out = tuples.get(1);
+
+    assertEquals(out.getDouble("a").doubleValue(), 4.0, 0.0);
+    assertEquals(out.getDouble("b").doubleValue(), 6.0, 0.0);
+
+
+    cexpr = "let(a=array(1,2,3,4)," +
+        "        b=array(10,11,12,13),"+
+        "        zplot(x=a, y=b))";
+
+    paramsLoc = new ModifiableSolrParams();
+    paramsLoc.set("expr", cexpr);
+    paramsLoc.set("qt", "/stream");
+    solrStream = new SolrStream(url, paramsLoc);
+    context = new StreamContext();
+    solrStream.setStreamContext(context);
+    tuples = getTuples(solrStream);
+    assertTrue(tuples.size() == 4);
+    out = tuples.get(0);
+
+    assertEquals(out.getDouble("x").doubleValue(), 1.0, 0.0);
+    assertEquals(out.getDouble("y").doubleValue(), 10.0, 0.0);
+
+    out = tuples.get(1);
+
+    assertEquals(out.getDouble("x").doubleValue(), 2.0, 0.0);
+    assertEquals(out.getDouble("y").doubleValue(), 11.0, 0.0);
+
+    out = tuples.get(2);
+
+    assertEquals(out.getDouble("x").doubleValue(), 3.0, 0.0);
+    assertEquals(out.getDouble("y").doubleValue(), 12.0, 0.0);
+
+    out = tuples.get(3);
+
+    assertEquals(out.getDouble("x").doubleValue(), 4.0, 0.0);
+    assertEquals(out.getDouble("y").doubleValue(), 13.0, 0.0);
+
+  }
+
+
   @Test
   public void testMatrixMath() throws Exception {
     String cexpr = "let(echo=true, a=matrix(array(1.5, 2.5, 3.5), array(4.5,5.5,6.5)), " +