You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by jb...@apache.org on 2018/12/27 19:52:16 UTC
lucene-solr:master: SOLR-13088: Add zplot Stream Evaluator to plot
math expressions in Apache Zeppelin
Repository: lucene-solr
Updated Branches:
refs/heads/master 106d30005 -> d018cd18f
SOLR-13088: Add zplot Stream Evaluator to plot math expressions in Apache Zeppelin
Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/d018cd18
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/d018cd18
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/d018cd18
Branch: refs/heads/master
Commit: d018cd18f4470982ffae6e7ff6c7de3ad868bec3
Parents: 106d300
Author: Joel Bernstein <jb...@apache.org>
Authored: Thu Dec 27 14:42:03 2018 -0500
Committer: Joel Bernstein <jb...@apache.org>
Committed: Thu Dec 27 14:42:27 2018 -0500
----------------------------------------------------------------------
.../org/apache/solr/client/solrj/io/Lang.java | 4 +-
.../client/solrj/io/stream/ZplotStream.java | 208 +++++++++++++++++++
.../apache/solr/client/solrj/io/TestLang.java | 3 +-
.../solrj/io/stream/MathExpressionTest.java | 77 +++++++
4 files changed, 290 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/d018cd18/solr/solrj/src/java/org/apache/solr/client/solrj/io/Lang.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/Lang.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/Lang.java
index 050fa7e..a1a796d 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/Lang.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/Lang.java
@@ -92,8 +92,10 @@ public class Lang {
.withFunctionName("tuple", TupStream.class)
.withFunctionName("sql", SqlStream.class)
.withFunctionName("plist", ParallelListStream.class)
+ .withFunctionName("zplot", ZplotStream.class)
- // metrics
+
+ // metrics
.withFunctionName("min", MinMetric.class)
.withFunctionName("max", MaxMetric.class)
.withFunctionName("avg", MeanMetric.class)
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/d018cd18/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ZplotStream.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ZplotStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ZplotStream.java
new file mode 100644
index 0000000..c5280dc
--- /dev/null
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ZplotStream.java
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.client.solrj.io.stream;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.solr.client.solrj.io.Tuple;
+import org.apache.solr.client.solrj.io.comp.StreamComparator;
+import org.apache.solr.client.solrj.io.eval.StreamEvaluator;
+import org.apache.solr.client.solrj.io.stream.expr.Explanation;
+import org.apache.solr.client.solrj.io.stream.expr.Explanation.ExpressionType;
+import org.apache.solr.client.solrj.io.stream.expr.Expressible;
+import org.apache.solr.client.solrj.io.stream.expr.StreamExplanation;
+import org.apache.solr.client.solrj.io.stream.expr.StreamExpression;
+import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionNamedParameter;
+import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionParameter;
+import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionValue;
+import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
+
+public class ZplotStream extends TupleStream implements Expressible {
+
+ private static final long serialVersionUID = 1;
+ private StreamContext streamContext;
+ private Map letParams = new LinkedHashMap();
+ private Iterator<Tuple> out;
+
+ public ZplotStream(StreamExpression expression, StreamFactory factory) throws IOException {
+
+ List<StreamExpressionNamedParameter> namedParams = factory.getNamedOperands(expression);
+ //Get all the named params
+
+ for(StreamExpressionParameter np : namedParams) {
+ String name = ((StreamExpressionNamedParameter)np).getName();
+ StreamExpressionParameter param = ((StreamExpressionNamedParameter)np).getParameter();
+ if(param instanceof StreamExpressionValue) {
+ String paramValue = ((StreamExpressionValue) param).getValue();
+ letParams.put(name, factory.constructPrimitiveObject(paramValue));
+ } else if(factory.isEvaluator((StreamExpression)param)) {
+ StreamEvaluator evaluator = factory.constructEvaluator((StreamExpression) param);
+ letParams.put(name, evaluator);
+ }
+ }
+ }
+
+ @Override
+ public StreamExpression toExpression(StreamFactory factory) throws IOException{
+ return toExpression(factory, true);
+ }
+
+ private StreamExpression toExpression(StreamFactory factory, boolean includeStreams) throws IOException {
+ // function name
+ StreamExpression expression = new StreamExpression(factory.getFunctionName(this.getClass()));
+
+ return expression;
+ }
+
+ @Override
+ public Explanation toExplanation(StreamFactory factory) throws IOException {
+
+ StreamExplanation explanation = new StreamExplanation(getStreamNodeId().toString());
+ explanation.setFunctionName(factory.getFunctionName(this.getClass()));
+ explanation.setImplementingClass(this.getClass().getName());
+ explanation.setExpressionType(ExpressionType.STREAM_DECORATOR);
+ explanation.setExpression(toExpression(factory, false).toString());
+
+ return explanation;
+ }
+
+ public void setStreamContext(StreamContext context) {
+ this.streamContext = context;
+ }
+
+ public List<TupleStream> children() {
+ List<TupleStream> l = new ArrayList<TupleStream>();
+ return l;
+ }
+
+ public Tuple read() throws IOException {
+ if(out.hasNext()) {
+ return out.next();
+ } else {
+ Map m = new HashMap();
+ m.put("EOF", true);
+ Tuple t = new Tuple(m);
+ return t;
+ }
+ }
+
+ public void close() throws IOException {
+ }
+
+ public void open() throws IOException {
+ Map<String, Object> lets = streamContext.getLets();
+ Set<Map.Entry<String, Object>> entries = letParams.entrySet();
+ Map<String, Object> evaluated = new HashMap();
+
+ //Load up the StreamContext with the data created by the letParams.
+ int numTuples = -1;
+ int columns = 0;
+ boolean table = false;
+ for(Map.Entry<String, Object> entry : entries) {
+ ++columns;
+
+ String name = entry.getKey();
+ if(name.equals("table")) {
+ table = true;
+ }
+
+ Object o = entry.getValue();
+ if(o instanceof StreamEvaluator) {
+ Tuple eTuple = new Tuple(lets);
+ StreamEvaluator evaluator = (StreamEvaluator)o;
+ evaluator.setStreamContext(streamContext);
+ Object eo = evaluator.evaluate(eTuple);
+ if(eo instanceof List) {
+ List l = (List)eo;
+ if(numTuples == -1) {
+ numTuples = l.size();
+ } else {
+ if(l.size() != numTuples) {
+ throw new IOException("All lists provided to the zplot function must be the same length.");
+ }
+ }
+ evaluated.put(name, l);
+ } else if (eo instanceof Tuple) {
+ evaluated.put(name, eo);
+ }
+ } else {
+ Object eval = lets.get(o);
+ if(eval instanceof List) {
+ List l = (List)eval;
+ if(numTuples == -1) {
+ numTuples = l.size();
+ } else {
+ if(l.size() != numTuples) {
+ throw new IOException("All lists provided to the zplot function must be the same length.");
+ }
+ }
+ evaluated.put(name, l);
+ } else if(eval instanceof Tuple) {
+ evaluated.put(name, eval);
+ }
+ }
+ }
+
+ if(columns > 1 && table) {
+ throw new IOException("If the table parameter is set there can only be one parameter.");
+ }
+ //Load the values into tuples
+
+ List<Tuple> outTuples = new ArrayList();
+ if(!table) {
+ //Handle the vectors
+ for (int i = 0; i < numTuples; i++) {
+ Tuple tuple = new Tuple(new HashMap());
+ for (String key : evaluated.keySet()) {
+ List l = (List) evaluated.get(key);
+ tuple.put(key, l.get(i));
+ }
+
+ outTuples.add(tuple);
+ }
+ } else {
+ //Handle the Tuple and List of Tuples
+ Object o = evaluated.get("table");
+ if(o instanceof List) {
+ List<Tuple> tuples = (List<Tuple>)o;
+ outTuples.addAll(tuples);
+ } else if(o instanceof Tuple) {
+ outTuples.add((Tuple)o);
+ }
+ }
+
+ this.out = outTuples.iterator();
+ }
+
+ /** Return the stream sort - ie, the order in which records are returned */
+ public StreamComparator getStreamSort(){
+ return null;
+ }
+
+ public int getCost() {
+ return 0;
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/d018cd18/solr/solrj/src/test/org/apache/solr/client/solrj/io/TestLang.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/io/TestLang.java b/solr/solrj/src/test/org/apache/solr/client/solrj/io/TestLang.java
index 3b238c2..b5b7317 100644
--- a/solr/solrj/src/test/org/apache/solr/client/solrj/io/TestLang.java
+++ b/solr/solrj/src/test/org/apache/solr/client/solrj/io/TestLang.java
@@ -73,7 +73,8 @@ public class TestLang extends LuceneTestCase {
"outliers", "stream", "getCache", "putCache", "listCache", "removeCache", "zscores", "latlonVectors",
"convexHull", "getVertices", "getBaryCenter", "getArea", "getBoundarySize","oscillate",
"getAmplitude", "getPhase", "getAngularFrequency", "enclosingDisk", "getCenter", "getRadius",
- "getSupportPoints", "pairSort", "log10", "plist", "recip", "pivot", "ltrim", "rtrim", "export"};
+ "getSupportPoints", "pairSort", "log10", "plist", "recip", "pivot", "ltrim", "rtrim", "export",
+ "zplot"};
@Test
public void testLang() {
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/d018cd18/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/MathExpressionTest.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/MathExpressionTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/MathExpressionTest.java
index 8ac184a..8e973d2 100644
--- a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/MathExpressionTest.java
+++ b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/MathExpressionTest.java
@@ -1356,6 +1356,83 @@ public class MathExpressionTest extends SolrCloudTestCase {
assertTrue(tuples.get(0).getLong("i")== 2);
}
+
+ @Test
+ public void testZplot() throws Exception {
+ String cexpr = "let(c=tuple(a=add(1,2), b=add(2,3))," +
+ " zplot(table=c))";
+
+ ModifiableSolrParams paramsLoc = new ModifiableSolrParams();
+ paramsLoc.set("expr", cexpr);
+ paramsLoc.set("qt", "/stream");
+ String url = cluster.getJettySolrRunners().get(0).getBaseUrl().toString()+"/"+COLLECTIONORALIAS;
+ TupleStream solrStream = new SolrStream(url, paramsLoc);
+ StreamContext context = new StreamContext();
+ solrStream.setStreamContext(context);
+ List<Tuple> tuples = getTuples(solrStream);
+ assertTrue(tuples.size() == 1);
+ Tuple out = tuples.get(0);
+
+ assertEquals(out.getDouble("a").doubleValue(), 3.0, 0.0);
+ assertEquals(out.getDouble("b").doubleValue(), 5.0, 0.0);
+
+ cexpr = "let(c=list(tuple(a=add(1,2), b=add(2,3)), tuple(a=add(1,3), b=add(2,4)))," +
+ " zplot(table=c))";
+
+ paramsLoc = new ModifiableSolrParams();
+ paramsLoc.set("expr", cexpr);
+ paramsLoc.set("qt", "/stream");
+ solrStream = new SolrStream(url, paramsLoc);
+ context = new StreamContext();
+ solrStream.setStreamContext(context);
+ tuples = getTuples(solrStream);
+ assertTrue(tuples.size() == 2);
+ out = tuples.get(0);
+
+ assertEquals(out.getDouble("a").doubleValue(), 3.0, 0.0);
+ assertEquals(out.getDouble("b").doubleValue(), 5.0, 0.0);
+
+ out = tuples.get(1);
+
+ assertEquals(out.getDouble("a").doubleValue(), 4.0, 0.0);
+ assertEquals(out.getDouble("b").doubleValue(), 6.0, 0.0);
+
+
+ cexpr = "let(a=array(1,2,3,4)," +
+ " b=array(10,11,12,13),"+
+ " zplot(x=a, y=b))";
+
+ paramsLoc = new ModifiableSolrParams();
+ paramsLoc.set("expr", cexpr);
+ paramsLoc.set("qt", "/stream");
+ solrStream = new SolrStream(url, paramsLoc);
+ context = new StreamContext();
+ solrStream.setStreamContext(context);
+ tuples = getTuples(solrStream);
+ assertTrue(tuples.size() == 4);
+ out = tuples.get(0);
+
+ assertEquals(out.getDouble("x").doubleValue(), 1.0, 0.0);
+ assertEquals(out.getDouble("y").doubleValue(), 10.0, 0.0);
+
+ out = tuples.get(1);
+
+ assertEquals(out.getDouble("x").doubleValue(), 2.0, 0.0);
+ assertEquals(out.getDouble("y").doubleValue(), 11.0, 0.0);
+
+ out = tuples.get(2);
+
+ assertEquals(out.getDouble("x").doubleValue(), 3.0, 0.0);
+ assertEquals(out.getDouble("y").doubleValue(), 12.0, 0.0);
+
+ out = tuples.get(3);
+
+ assertEquals(out.getDouble("x").doubleValue(), 4.0, 0.0);
+ assertEquals(out.getDouble("y").doubleValue(), 13.0, 0.0);
+
+ }
+
+
@Test
public void testMatrixMath() throws Exception {
String cexpr = "let(echo=true, a=matrix(array(1.5, 2.5, 3.5), array(4.5,5.5,6.5)), " +