You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by cp...@apache.org on 2017/05/04 17:10:08 UTC
[14/50] lucene-solr:jira/solr-8668: SOLR-10559: Cleaner syntax
SOLR-10559: Cleaner syntax
Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/e57fab17
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/e57fab17
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/e57fab17
Branch: refs/heads/jira/solr-8668
Commit: e57fab17c0d440cfa9e54d87001bc9691e8ed53d
Parents: 64caf17
Author: Joel Bernstein <jb...@apache.org>
Authored: Thu Apr 27 16:30:46 2017 -0400
Committer: Joel Bernstein <jb...@apache.org>
Committed: Thu Apr 27 17:03:29 2017 -0400
----------------------------------------------------------------------
.../org/apache/solr/handler/StreamHandler.java | 2 +
.../solr/client/solrj/io/eval/AddEvaluator.java | 4 +-
.../client/solrj/io/eval/NumberEvaluator.java | 13 ++
.../client/solrj/io/stream/ColumnEvaluator.java | 78 +++++++++
.../solr/client/solrj/io/stream/LetStream.java | 95 ++++++-----
.../client/solrj/io/stream/StreamContext.java | 4 +-
.../solr/client/solrj/io/stream/TupStream.java | 165 +++++++++++++++++++
.../solrj/io/stream/expr/StreamFactory.java | 13 +-
.../solrj/io/stream/StreamExpressionTest.java | 90 +++++-----
.../solrj/io/stream/eval/AddEvaluatorTest.java | 5 -
10 files changed, 371 insertions(+), 98 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/e57fab17/solr/core/src/java/org/apache/solr/handler/StreamHandler.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/handler/StreamHandler.java b/solr/core/src/java/org/apache/solr/handler/StreamHandler.java
index c750ce9..5414389 100644
--- a/solr/core/src/java/org/apache/solr/handler/StreamHandler.java
+++ b/solr/core/src/java/org/apache/solr/handler/StreamHandler.java
@@ -166,6 +166,8 @@ public class StreamHandler extends RequestHandlerBase implements SolrCoreAware,
.withFunctionName("let", LetStream.class)
.withFunctionName("get", GetStream.class)
.withFunctionName("timeseries", TimeSeriesStream.class)
+ .withFunctionName("tuple", TupStream.class)
+ .withFunctionName("col", ColumnEvaluator.class)
// metrics
.withFunctionName("min", MinMetric.class)
.withFunctionName("max", MaxMetric.class)
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/e57fab17/solr/solrj/src/java/org/apache/solr/client/solrj/io/eval/AddEvaluator.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/eval/AddEvaluator.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/eval/AddEvaluator.java
index 317741e..94a9280 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/eval/AddEvaluator.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/eval/AddEvaluator.java
@@ -34,8 +34,8 @@ public class AddEvaluator extends NumberEvaluator {
public AddEvaluator(StreamExpression expression, StreamFactory factory) throws IOException{
super(expression, factory);
- if(subEvaluators.size() < 2){
- throw new IOException(String.format(Locale.ROOT,"Invalid expression %s - expecting at least two values but found %d",expression,subEvaluators.size()));
+ if(subEvaluators.size() < 1){
+ throw new IOException(String.format(Locale.ROOT,"Invalid expression %s - expecting at least 1 value but found %d",expression,subEvaluators.size()));
}
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/e57fab17/solr/solrj/src/java/org/apache/solr/client/solrj/io/eval/NumberEvaluator.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/eval/NumberEvaluator.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/eval/NumberEvaluator.java
index f4491fd..d7c26b0 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/eval/NumberEvaluator.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/eval/NumberEvaluator.java
@@ -50,6 +50,19 @@ public abstract class NumberEvaluator extends ComplexEvaluator {
}
else if(result instanceof Number){
results.add(new BigDecimal(result.toString()));
+ } else if(result instanceof List) {
+ List l = (List) result;
+ for(Object o : l) {
+ if(o instanceof Number) {
+ results.add(new BigDecimal(o.toString()));
+ } else {
+ String message = String.format(Locale.ROOT,"Failed to evaluate to a numeric value - evaluator '%s' resulted in type '%s' and value '%s'",
+ subEvaluator.toExpression(constructingFactory),
+ o.getClass().getName(),
+ o.toString());
+ throw new IOException(message);
+ }
+ }
}
else{
String message = String.format(Locale.ROOT,"Failed to evaluate to a numeric value - evaluator '%s' resulted in type '%s' and value '%s'",
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/e57fab17/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ColumnEvaluator.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ColumnEvaluator.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ColumnEvaluator.java
new file mode 100644
index 0000000..eccd15c
--- /dev/null
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ColumnEvaluator.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.client.solrj.io.stream;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.solr.client.solrj.io.Tuple;
+import org.apache.solr.client.solrj.io.eval.SimpleEvaluator;
+import org.apache.solr.client.solrj.io.stream.expr.Explanation;
+import org.apache.solr.client.solrj.io.stream.expr.Explanation.ExpressionType;
+import org.apache.solr.client.solrj.io.stream.expr.Expressible;
+import org.apache.solr.client.solrj.io.stream.expr.StreamExpression;
+import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionParameter;
+import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
+
+public class ColumnEvaluator extends SimpleEvaluator implements Expressible {
+
+ private static final long serialVersionUID = 1;
+ private String name;
+ private String fieldName;
+ ;
+ public ColumnEvaluator(StreamExpression expression, StreamFactory factory) throws IOException {
+ String name = factory.getValueOperand(expression, 0);
+ String fieldName = factory.getValueOperand(expression, 1);
+ init(name, fieldName);
+ }
+
+ private void init(String name, String fieldName) {
+ this.name = name;
+ this.fieldName = fieldName;
+ }
+
+ public List<Number> evaluate(Tuple tuple) throws IOException {
+ List<Tuple> tuples = (List<Tuple>)tuple.get(name);
+ List<Number> column = new ArrayList(tuples.size());
+ for(Tuple t : tuples) {
+ System.out.println("###### Field:"+fieldName);
+ Object o = t.get(fieldName);
+ if(o instanceof Number) {
+ column.add((Number)o);
+ } else {
+ throw new IOException("Found non-numeric in column:"+o.toString());
+ }
+ }
+ return column;
+ }
+
+ @Override
+ public StreamExpressionParameter toExpression(StreamFactory factory) throws IOException {
+ StreamExpression expression = new StreamExpression(factory.getFunctionName(getClass()));
+ return expression;
+ }
+
+ @Override
+ public Explanation toExplanation(StreamFactory factory) throws IOException {
+ return new Explanation(nodeId.toString())
+ .withExpressionType(ExpressionType.EVALUATOR)
+ .withFunctionName(factory.getFunctionName(getClass()))
+ .withImplementingClass(getClass().getName())
+ .withExpression(toExpression(factory).toString());
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/e57fab17/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/LetStream.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/LetStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/LetStream.java
index 3a17211..8c9f02b 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/LetStream.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/LetStream.java
@@ -18,61 +18,56 @@ package org.apache.solr.client.solrj.io.stream;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
+import java.util.Set;
import org.apache.solr.client.solrj.io.Tuple;
import org.apache.solr.client.solrj.io.comp.StreamComparator;
+import org.apache.solr.client.solrj.io.eval.StreamEvaluator;
import org.apache.solr.client.solrj.io.stream.expr.Explanation;
import org.apache.solr.client.solrj.io.stream.expr.Explanation.ExpressionType;
import org.apache.solr.client.solrj.io.stream.expr.Expressible;
import org.apache.solr.client.solrj.io.stream.expr.StreamExplanation;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpression;
+import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionNamedParameter;
+import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionParameter;
import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
public class LetStream extends TupleStream implements Expressible {
private static final long serialVersionUID = 1;
private TupleStream stream;
- private List<CellStream> cellStreams;
private StreamContext streamContext;
-
- public LetStream(TupleStream stream, List<CellStream> cellStreams) throws IOException {
- init(stream, cellStreams);
- }
+ private Map letParams = new HashMap();
public LetStream(StreamExpression expression, StreamFactory factory) throws IOException {
List<StreamExpression> streamExpressions = factory.getExpressionOperandsRepresentingTypes(expression, Expressible.class, TupleStream.class);
- if(streamExpressions.size() < 2){
- throw new IOException(String.format(Locale.ROOT,"Invalid expression %s - expecting atleast 2 streams but found %d",expression, streamExpressions.size()));
- }
-
- TupleStream stream = null;
- List<CellStream> cellStreams = new ArrayList();
-
- for(StreamExpression streamExpression : streamExpressions) {
- TupleStream s = factory.constructStream(streamExpression);
- if(s instanceof CellStream) {
- cellStreams.add((CellStream)s);
+ List<StreamExpressionNamedParameter> namedParams = factory.getNamedOperands(expression);
+ //Get all the named params
+ for(StreamExpressionParameter np : namedParams) {
+ String name = ((StreamExpressionNamedParameter)np).getName();
+ StreamExpressionParameter param = ((StreamExpressionNamedParameter)np).getParameter();
+ if(factory.isEvaluator((StreamExpression)param)) {
+ StreamEvaluator evaluator = factory.constructEvaluator((StreamExpression) param);
+ letParams.put(name, evaluator);
} else {
- if(stream == null) {
- stream = s;
- } else {
- throw new IOException("Found more then one stream that was not a CellStream");
- }
+ TupleStream tupleStream = factory.constructStream((StreamExpression) param);
+ letParams.put(name, tupleStream);
}
}
- init(stream, cellStreams);
- }
+ if(streamExpressions.size() != 1){
+ throw new IOException(String.format(Locale.ROOT,"Invalid expression %s - expecting 1 stream but found %d",expression, streamExpressions.size()));
+ }
- private void init(TupleStream _stream, List<CellStream> _cellStreams) {
- this.stream = _stream;
- this.cellStreams = _cellStreams;
+ stream = factory.constructStream(streamExpressions.get(0));
}
+
@Override
public StreamExpression toExpression(StreamFactory factory) throws IOException{
return toExpression(factory, true);
@@ -82,9 +77,6 @@ public class LetStream extends TupleStream implements Expressible {
// function name
StreamExpression expression = new StreamExpression(factory.getFunctionName(this.getClass()));
expression.addParameter(((Expressible) stream).toExpression(factory));
- for(CellStream cellStream : cellStreams) {
- expression.addParameter(((Expressible)cellStream).toExpression(factory));
- }
return expression;
}
@@ -123,17 +115,40 @@ public class LetStream extends TupleStream implements Expressible {
}
public void open() throws IOException {
- Map<String, List<Tuple>> lets = streamContext.getLets();
- for(CellStream cellStream : cellStreams) {
- try {
- cellStream.setStreamContext(streamContext);
- cellStream.open();
- Tuple tup = cellStream.read();
- String name = cellStream.getName();
- List<Tuple> tuples = (List<Tuple>)tup.get(name);
- lets.put(name, tuples);
- } finally {
- cellStream.close();
+ Map<String, Object> lets = streamContext.getLets();
+ Set<Map.Entry<String, Object>> entries = letParams.entrySet();
+
+ //Load up the StreamContext with the data created by the letParams.
+ for(Map.Entry<String, Object> entry : entries) {
+ String name = entry.getKey();
+ Object o = entry.getValue();
+ if(o instanceof TupleStream) {
+ List<Tuple> tuples = new ArrayList();
+ TupleStream tStream = (TupleStream)o;
+ tStream.setStreamContext(streamContext);
+ try {
+ tStream.open();
+ TUPLES:
+ while(true) {
+ Tuple tuple = tStream.read();
+ if (tuple.EOF) {
+ break TUPLES;
+ } else {
+ tuples.add(tuple);
+ }
+ }
+ lets.put(name, tuples);
+ } finally {
+ tStream.close();
+ }
+ } else {
+ //Add the data from the StreamContext to a tuple.
+ //Let the evaluator work from this tuple.
+ //This will allow columns to be created from tuples already in the StreamContext.
+ Tuple eTuple = new Tuple(lets);
+ StreamEvaluator evaluator = (StreamEvaluator)o;
+ Object eo = evaluator.evaluate(eTuple);
+ lets.put(name, eo);
}
}
stream.open();
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/e57fab17/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/StreamContext.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/StreamContext.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/StreamContext.java
index 5dcc7b3..190ecdc 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/StreamContext.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/StreamContext.java
@@ -39,14 +39,14 @@ public class StreamContext implements Serializable{
private Map entries = new HashMap();
private Map tupleContext = new HashMap();
- private Map<String, List<Tuple>> lets = new HashMap();
+ private Map<String, Object> lets = new HashMap();
public int workerID;
public int numWorkers;
private SolrClientCache clientCache;
private ModelCache modelCache;
private StreamFactory streamFactory;
- public Map<String, List<Tuple>> getLets(){
+ public Map<String, Object> getLets(){
return lets;
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/e57fab17/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/TupStream.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/TupStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/TupStream.java
new file mode 100644
index 0000000..955e128
--- /dev/null
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/TupStream.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.client.solrj.io.stream;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.solr.client.solrj.io.Tuple;
+import org.apache.solr.client.solrj.io.comp.StreamComparator;
+import org.apache.solr.client.solrj.io.eval.StreamEvaluator;
+import org.apache.solr.client.solrj.io.stream.expr.Explanation;
+import org.apache.solr.client.solrj.io.stream.expr.Explanation.ExpressionType;
+import org.apache.solr.client.solrj.io.stream.expr.Expressible;
+import org.apache.solr.client.solrj.io.stream.expr.StreamExplanation;
+import org.apache.solr.client.solrj.io.stream.expr.StreamExpression;
+import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionNamedParameter;
+import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionParameter;
+import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionValue;
+import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
+
+public class TupStream extends TupleStream implements Expressible {
+
+ private static final long serialVersionUID = 1;
+ private StreamContext streamContext;
+ private Map tupleParams = new HashMap();
+ private boolean finished;
+
+ public TupStream(StreamExpression expression, StreamFactory factory) throws IOException {
+
+ List<StreamExpressionNamedParameter> namedParams = factory.getNamedOperands(expression);
+ //Get all the named params
+ for(StreamExpressionParameter np : namedParams) {
+ String name = ((StreamExpressionNamedParameter)np).getName();
+ StreamExpressionParameter param = ((StreamExpressionNamedParameter)np).getParameter();
+
+ if(param instanceof StreamExpressionValue) {
+ tupleParams.put(name, ((StreamExpressionValue)param).getValue());
+ } else {
+ if (factory.isEvaluator((StreamExpression) param)) {
+ StreamEvaluator evaluator = factory.constructEvaluator((StreamExpression) param);
+ tupleParams.put(name, evaluator);
+ } else {
+ TupleStream tupleStream = factory.constructStream((StreamExpression) param);
+ tupleParams.put(name, tupleStream);
+ }
+ }
+ }
+ }
+
+ @Override
+ public StreamExpression toExpression(StreamFactory factory) throws IOException{
+ return toExpression(factory, true);
+ }
+
+ private StreamExpression toExpression(StreamFactory factory, boolean includeStreams) throws IOException {
+ // function name
+ StreamExpression expression = new StreamExpression(factory.getFunctionName(this.getClass()));
+
+ return expression;
+ }
+
+ @Override
+ public Explanation toExplanation(StreamFactory factory) throws IOException {
+
+ StreamExplanation explanation = new StreamExplanation(getStreamNodeId().toString());
+ explanation.setFunctionName(factory.getFunctionName(this.getClass()));
+ explanation.setImplementingClass(this.getClass().getName());
+ explanation.setExpressionType(ExpressionType.STREAM_DECORATOR);
+ explanation.setExpression(toExpression(factory, false).toString());
+
+ return explanation;
+ }
+
+ public void setStreamContext(StreamContext context) {
+ this.streamContext = context;
+ }
+
+ public List<TupleStream> children() {
+ List<TupleStream> l = new ArrayList<TupleStream>();
+ return l;
+ }
+
+ public Tuple read() throws IOException {
+
+ if(finished) {
+ Map m = new HashMap();
+ m.put("EOF", true);
+ return new Tuple(m);
+ } else {
+ finished = true;
+ Map<String, Object> map = new HashMap();
+ Set<Map.Entry<String, Object>> entries = tupleParams.entrySet();
+
+ for (Map.Entry<String, Object> entry : entries) {
+ String name = entry.getKey();
+ Object o = entry.getValue();
+ if (o instanceof TupleStream) {
+ List<Tuple> tuples = new ArrayList();
+ TupleStream tStream = (TupleStream) o;
+ tStream.setStreamContext(streamContext);
+ try {
+ tStream.open();
+ TUPLES:
+ while (true) {
+ Tuple tuple = tStream.read();
+ if (tuple.EOF) {
+ break TUPLES;
+ } else {
+ tuples.add(tuple);
+ }
+ }
+ map.put(name, tuples);
+ } finally {
+ tStream.close();
+ }
+ } else if ((o instanceof StreamEvaluator)) {
+ Tuple eTuple = new Tuple(streamContext.getLets());
+ StreamEvaluator evaluator = (StreamEvaluator) o;
+ Object eo = evaluator.evaluate(eTuple);
+ map.put(name, eo);
+ } else {
+ map.put(name, streamContext.getLets().get(o.toString()));
+ }
+ }
+ return new Tuple(map);
+ }
+ }
+
+ public void close() throws IOException {
+ }
+
+ public void open() throws IOException {
+
+ }
+
+ /** Return the stream sort - ie, the order in which records are returned */
+ public StreamComparator getStreamSort(){
+ return null;
+ }
+
+ public int getCost() {
+ return 0;
+ }
+
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/e57fab17/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/expr/StreamFactory.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/expr/StreamFactory.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/expr/StreamFactory.java
index f57319d..f03bf48 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/expr/StreamFactory.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/expr/StreamFactory.java
@@ -90,7 +90,7 @@ public class StreamFactory implements Serializable {
}
public List<String> getValueOperands(StreamExpression expression){
- return getOperandsOfType(expression, StreamExpressionValue.class).stream().map(item -> ((StreamExpressionValue)item).getValue()).collect(Collectors.toList());
+ return getOperandsOfType(expression, StreamExpressionValue.class).stream().map(item -> ((StreamExpressionValue) item).getValue()).collect(Collectors.toList());
}
/** Given an expression, will return the value parameter at the given index, or null if doesn't exist */
@@ -377,6 +377,17 @@ public class StreamFactory implements Serializable {
throw new IOException(String.format(Locale.ROOT,"Invalid evaluator expression %s - function '%s' is unknown (not mapped to a valid StreamEvaluator)", expression, expression.getFunctionName()));
}
+ public boolean isEvaluator(StreamExpression expression) throws IOException{
+ String function = expression.getFunctionName();
+ if(functionNames.containsKey(function)){
+ Class<? extends Expressible> clazz = functionNames.get(function);
+ if(Expressible.class.isAssignableFrom(clazz) && StreamEvaluator.class.isAssignableFrom(clazz)){
+ return true;
+ }
+ }
+
+ return false;
+ }
public <T> T createInstance(Class<T> clazz, Class<?>[] paramTypes, Object[] params) throws IOException{
Constructor<T> ctor;
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/e57fab17/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java
index c459779..e9de18a 100644
--- a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java
+++ b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java
@@ -5111,6 +5111,8 @@ public class StreamExpressionTest extends SolrCloudTestCase {
+
+
@Test
public void testListStream() throws Exception {
UpdateRequest updateRequest = new UpdateRequest();
@@ -5174,15 +5176,16 @@ public class StreamExpressionTest extends SolrCloudTestCase {
}
@Test
- public void testLetGetStream() throws Exception {
+ public void testTupleStream() throws Exception {
UpdateRequest updateRequest = new UpdateRequest();
updateRequest.add(id, "hello", "test_t", "l b c d c e");
updateRequest.add(id, "hello1", "test_t", "l b c d c");
-
updateRequest.commit(cluster.getSolrClient(), COLLECTIONORALIAS);
String expr = "search("+COLLECTIONORALIAS+", q=\"*:*\", fl=\"id,test_t\", sort=\"id desc\")";
- String cat = "let(cell(results,"+expr+"), get(results))";
+
+ //Add a Stream and an Evaluator to the Tuple.
+ String cat = "tuple(results="+expr+", sum=add(1,1))";
ModifiableSolrParams paramsLoc = new ModifiableSolrParams();
paramsLoc.set("expr", cat);
paramsLoc.set("qt", "/stream");
@@ -5193,60 +5196,51 @@ public class StreamExpressionTest extends SolrCloudTestCase {
StreamContext context = new StreamContext();
solrStream.setStreamContext(context);
List<Tuple> tuples = getTuples(solrStream);
- assertTrue(tuples.size() == 2);
- assertTrue(tuples.get(0).get("id").equals("hello1"));
- assertTrue(tuples.get(0).get("test_t").equals("l b c d c"));
- assertTrue(tuples.get(1).get("id").equals("hello"));
- assertTrue(tuples.get(1).get("test_t").equals("l b c d c e"));
+ assertTrue(tuples.size() == 1);
+ List<Map> results = (List<Map>)tuples.get(0).get("results");
+ assertTrue(results.get(0).get("id").equals("hello1"));
+ assertTrue(results.get(0).get("test_t").equals("l b c d c"));
+ assertTrue(results.get(1).get("id").equals("hello"));
+ assertTrue(results.get(1).get("test_t").equals("l b c d c e"));
+ assertTrue(tuples.get(0).getLong("sum").equals(2L));
- //Test there are no side effects when transforming tuples.
- expr = "search("+COLLECTIONORALIAS+", q=\"*:*\", fl=\"id,test_t\", sort=\"id desc\")";
- cat = "let(cell(results,"+expr+"), list(select(get(results), id as newid, test_t), get(results)))";
- paramsLoc = new ModifiableSolrParams();
- paramsLoc.set("expr", cat);
- paramsLoc.set("qt", "/stream");
+ }
- solrStream = new SolrStream(url, paramsLoc);
+ @Test
+ public void testLetStream() throws Exception {
+ UpdateRequest updateRequest = new UpdateRequest();
+ updateRequest.add(id, "hello", "test_t", "l b c d c e", "test_i", "5");
+ updateRequest.add(id, "hello1", "test_t", "l b c d c", "test_i", "4");
- context = new StreamContext();
- solrStream.setStreamContext(context);
- tuples = getTuples(solrStream);
- assertTrue(tuples.size() == 4);
- assertTrue(tuples.get(0).get("newid").equals("hello1"));
- assertTrue(tuples.get(0).get("test_t").equals("l b c d c"));
- assertTrue(tuples.get(1).get("newid").equals("hello"));
- assertTrue(tuples.get(1).get("test_t").equals("l b c d c e"));
- assertTrue(tuples.get(2).get("id").equals("hello1"));
- assertTrue(tuples.get(2).get("test_t").equals("l b c d c"));
- assertTrue(tuples.get(3).get("id").equals("hello"));
- assertTrue(tuples.get(3).get("test_t").equals("l b c d c e"));
-
- //Test multiple lets
-
- //Test there are no side effects when transforming tuples.
- expr = "search("+COLLECTIONORALIAS+", q=\"*:*\", fl=\"id,test_t\", sort=\"id desc\")";
- String expr1 = "search("+COLLECTIONORALIAS+", q=\"*:*\", fl=\"id,test_t\", sort=\"id asc\")";
-
- cat = "let(cell(results,"+expr+"), cell(results1,"+expr1+"), list(select(get(results), id as newid, test_t), get(results1)))";
- paramsLoc = new ModifiableSolrParams();
+ updateRequest.commit(cluster.getSolrClient(), COLLECTIONORALIAS);
+
+ String expr = "search("+COLLECTIONORALIAS+", q=\"*:*\", fl=\"id,test_t, test_i\", sort=\"id desc\")";
+ String cat = "let(a="+expr+", b=add(1,3), c=col(a, test_i), tuple(test=add(1,1), test1=b, results=a, test2=add(c)))";
+ ModifiableSolrParams paramsLoc = new ModifiableSolrParams();
paramsLoc.set("expr", cat);
paramsLoc.set("qt", "/stream");
- solrStream = new SolrStream(url, paramsLoc);
+ String url = cluster.getJettySolrRunners().get(0).getBaseUrl().toString()+"/"+COLLECTIONORALIAS;
+ TupleStream solrStream = new SolrStream(url, paramsLoc);
- context = new StreamContext();
+ StreamContext context = new StreamContext();
solrStream.setStreamContext(context);
- tuples = getTuples(solrStream);
- assertTrue(tuples.size() == 4);
- assertTrue(tuples.get(0).get("newid").equals("hello1"));
- assertTrue(tuples.get(0).get("test_t").equals("l b c d c"));
- assertTrue(tuples.get(1).get("newid").equals("hello"));
- assertTrue(tuples.get(1).get("test_t").equals("l b c d c e"));
- assertTrue(tuples.get(2).get("id").equals("hello"));
- assertTrue(tuples.get(2).get("test_t").equals("l b c d c e"));
- assertTrue(tuples.get(3).get("id").equals("hello1"));
- assertTrue(tuples.get(3).get("test_t").equals("l b c d c"));
+ List<Tuple> tuples = getTuples(solrStream);
+ assertTrue(tuples.size() == 1);
+ Tuple tuple1 = tuples.get(0);
+ List<Map> results = (List<Map>)tuple1.get("results");
+ assertTrue(results.size() == 2);
+ assertTrue(results.get(0).get("id").equals("hello1"));
+ assertTrue(results.get(0).get("test_t").equals("l b c d c"));
+ assertTrue(results.get(1).get("id").equals("hello"));
+ assertTrue(results.get(1).get("test_t").equals("l b c d c e"));
+
+ assertTrue(tuple1.getLong("test").equals(2L));
+ assertTrue(tuple1.getLong("test1").equals(4L));
+ assertTrue(tuple1.getLong("test2").equals(9L));
+
+
}
@Test
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/e57fab17/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/eval/AddEvaluatorTest.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/eval/AddEvaluatorTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/eval/AddEvaluatorTest.java
index ac31acc..a8df004 100644
--- a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/eval/AddEvaluatorTest.java
+++ b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/eval/AddEvaluatorTest.java
@@ -68,11 +68,6 @@ public class AddEvaluatorTest extends LuceneTestCase {
Assert.assertTrue(result instanceof Double);
Assert.assertEquals(3.2D, result);
}
-
- @Test(expected = IOException.class)
- public void addOneField() throws Exception{
- factory.constructEvaluator("add(a)");
- }
@Test
public void addTwoFieldWithNulls() throws Exception{