You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by dp...@apache.org on 2017/04/29 01:50:33 UTC
lucene-solr:master: SOLR-10559: Updates TupStream and enhances
evaluators to work over values in the SteamContext
Repository: lucene-solr
Updated Branches:
refs/heads/master 7f6f68c7f -> 460b3b36e
SOLR-10559: Updates TupStream and enhances evaluators to work over values in the SteamContext
Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/460b3b36
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/460b3b36
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/460b3b36
Branch: refs/heads/master
Commit: 460b3b36e9544b4be75cf1d25c89d8bde8ec5c74
Parents: 7f6f68c
Author: Dennis Gove <dp...@gmail.com>
Authored: Fri Apr 28 21:45:56 2017 -0400
Committer: Dennis Gove <dp...@gmail.com>
Committed: Fri Apr 28 21:45:56 2017 -0400
----------------------------------------------------------------------
.../org/apache/solr/client/solrj/io/Tuple.java | 4 +
.../solrj/io/comp/SingleValueComparator.java | 65 +++++++++
.../client/solrj/io/eval/ComplexEvaluator.java | 7 +
.../client/solrj/io/eval/FieldEvaluator.java | 17 ++-
.../client/solrj/io/eval/SimpleEvaluator.java | 3 +
.../client/solrj/io/eval/StreamEvaluator.java | 26 +++-
.../solr/client/solrj/io/stream/TupStream.java | 145 ++++++++++++-------
.../solrj/io/stream/expr/StreamFactory.java | 12 ++
.../stream/eval/AbsoluteValueEvaluatorTest.java | 33 ++++-
9 files changed, 260 insertions(+), 52 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/460b3b36/solr/solrj/src/java/org/apache/solr/client/solrj/io/Tuple.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/Tuple.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/Tuple.java
index fdf44c9..d82c864 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/Tuple.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/Tuple.java
@@ -48,6 +48,10 @@ public class Tuple implements Cloneable, MapWriter {
public List<String> fieldNames;
public Map<String, String> fieldLabels;
+ public Tuple(){
+ // just an empty tuple
+ }
+
public Tuple(Map fields) {
if(fields.containsKey("EOF")) {
EOF = true;
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/460b3b36/solr/solrj/src/java/org/apache/solr/client/solrj/io/comp/SingleValueComparator.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/comp/SingleValueComparator.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/comp/SingleValueComparator.java
new file mode 100644
index 0000000..c429072
--- /dev/null
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/comp/SingleValueComparator.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.client.solrj.io.comp;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.UUID;
+
+import org.apache.solr.client.solrj.io.Tuple;
+import org.apache.solr.client.solrj.io.stream.expr.Explanation;
+import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionParameter;
+import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
+
+/**
+ * An equality Comparator to be used when a stream will only ever return a single field,
+ * ie, it has no sorted order
+ **/
+public class SingleValueComparator implements StreamComparator {
+
+ private static final long serialVersionUID = 1;
+ private UUID comparatorNodeId = UUID.randomUUID();
+
+ public StreamExpressionParameter toExpression(StreamFactory factory){
+ return null;
+ }
+
+ @Override
+ public Explanation toExplanation(StreamFactory factory) throws IOException {
+ return null;
+ }
+
+ public int compare(Tuple leftTuple, Tuple rightTuple) {
+ return -1; // whatever, just keep everything in same order
+ }
+
+ @Override
+ public boolean isDerivedFrom(StreamComparator base){
+ // this doesn't sort, so everything else is a match
+ return true;
+ }
+
+ @Override
+ public SingleValueComparator copyAliased(Map<String,String> aliases){
+ return this;
+ }
+
+ @Override
+ public StreamComparator append(StreamComparator other){
+ return other;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/460b3b36/solr/solrj/src/java/org/apache/solr/client/solrj/io/eval/ComplexEvaluator.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/eval/ComplexEvaluator.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/eval/ComplexEvaluator.java
index 59a4653..ea4c88c 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/eval/ComplexEvaluator.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/eval/ComplexEvaluator.java
@@ -101,5 +101,12 @@ public abstract class ComplexEvaluator implements StreamEvaluator {
public void setStreamContext(StreamContext context) {
this.streamContext = context;
+
+ for(StreamEvaluator subEvaluator : subEvaluators){
+ subEvaluator.setStreamContext(context);
+ }
+ }
+ public StreamContext getStreamContext(){
+ return streamContext;
}
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/460b3b36/solr/solrj/src/java/org/apache/solr/client/solrj/io/eval/FieldEvaluator.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/eval/FieldEvaluator.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/eval/FieldEvaluator.java
index 3251498..501e9d5 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/eval/FieldEvaluator.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/eval/FieldEvaluator.java
@@ -44,9 +44,24 @@ public class FieldEvaluator extends SimpleEvaluator {
}
@Override
- public Object evaluate(Tuple tuple) {
+ public Object evaluate(Tuple tuple) throws IOException {
Object value = tuple.get(fieldName);
+ // This is somewhat radical.
+ // Here, we allow for the use of the context to provide alternative values
+ // when they are not available in the provided tuple. This means that all
+ // evaluators can evaluate over both a stream's tuple and the context, and
+ // can even evaluate over fields from both of them in the same evaluation
+ if(null == value && null != getStreamContext()){
+ value = getStreamContext().getLets().get(fieldName);
+
+ // If what's contained in the context is itself an evaluator then
+ // we need to evaluate it
+ if(value instanceof StreamEvaluator){
+ value = ((StreamEvaluator)value).evaluate(tuple);
+ }
+ }
+
// if we have an array then convert to an ArrayList
// if we have an iterable that is not a list then convert to ArrayList
// lists are good to go
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/460b3b36/solr/solrj/src/java/org/apache/solr/client/solrj/io/eval/SimpleEvaluator.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/eval/SimpleEvaluator.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/eval/SimpleEvaluator.java
index 5ee1715..4a095f8 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/eval/SimpleEvaluator.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/eval/SimpleEvaluator.java
@@ -32,5 +32,8 @@ public abstract class SimpleEvaluator implements StreamEvaluator {
public void setStreamContext(StreamContext streamContext) {
this.streamContext = streamContext;
}
+ public StreamContext getStreamContext(){
+ return streamContext;
+ }
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/460b3b36/solr/solrj/src/java/org/apache/solr/client/solrj/io/eval/StreamEvaluator.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/eval/StreamEvaluator.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/eval/StreamEvaluator.java
index 1774c46..e82d5d3 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/eval/StreamEvaluator.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/eval/StreamEvaluator.java
@@ -27,6 +27,30 @@ import org.apache.solr.client.solrj.io.stream.StreamContext;
import org.apache.solr.client.solrj.io.stream.expr.Expressible;
public interface StreamEvaluator extends Expressible, Serializable {
- Object evaluate(final Tuple tuple) throws IOException;
void setStreamContext(StreamContext streamContext);
+ StreamContext getStreamContext();
+
+ Object evaluate(final Tuple tuple) throws IOException;
+
+ /**
+ * Execute the evaluator over lets stored within the StreamContext. This allows
+ * evaluators to be executed over values calculated elsewhere in the pipeline
+ * and stored in the {@link StreamContext#getLets() streamContext.lets}
+ *
+ * Default implementation just creates a tuple out of all values in the context
+ * and passes that to {@link StreamEvaluator#evaluate(Tuple)}.
+ *
+ * @return Evaluated value
+ * @throws IOException throw on error during evaluation
+ */
+ default Object evaluateOverContext() throws IOException{
+ StreamContext context = getStreamContext();
+ if(null != context){
+ Tuple contextTuple = new Tuple(context.getLets());
+ return evaluate(contextTuple);
+ }
+
+ return null;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/460b3b36/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/TupStream.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/TupStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/TupStream.java
index 57c7b76..8a71ae6 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/TupStream.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/TupStream.java
@@ -20,10 +20,12 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
+import java.util.Locale;
import java.util.Map;
-import java.util.Set;
+import java.util.Map.Entry;
import org.apache.solr.client.solrj.io.Tuple;
+import org.apache.solr.client.solrj.io.comp.SingleValueComparator;
import org.apache.solr.client.solrj.io.comp.StreamComparator;
import org.apache.solr.client.solrj.io.eval.StreamEvaluator;
import org.apache.solr.client.solrj.io.stream.expr.Explanation;
@@ -40,27 +42,34 @@ public class TupStream extends TupleStream implements Expressible {
private static final long serialVersionUID = 1;
private StreamContext streamContext;
- private Map tupleParams = new HashMap();
+
+ private Map<String,String> stringParams = new HashMap<>();
+ private Map<String,StreamEvaluator> evaluatorParams = new HashMap<>();
+ private Map<String,TupleStream> streamParams = new HashMap<>();
+
private boolean finished;
public TupStream(StreamExpression expression, StreamFactory factory) throws IOException {
List<StreamExpressionNamedParameter> namedParams = factory.getNamedOperands(expression);
//Get all the named params
- for(StreamExpressionParameter np : namedParams) {
- String name = ((StreamExpressionNamedParameter)np).getName();
- StreamExpressionParameter param = ((StreamExpressionNamedParameter)np).getParameter();
+ for(StreamExpressionNamedParameter np : namedParams) {
+ String name = np.getName();
+ StreamExpressionParameter param = np.getParameter();
+ // we're going to split these up here so we only make the choice once
+ // order of these in read() doesn't matter
if(param instanceof StreamExpressionValue) {
- tupleParams.put(name, ((StreamExpressionValue)param).getValue());
- } else {
- if (factory.isEvaluator((StreamExpression) param)) {
- StreamEvaluator evaluator = factory.constructEvaluator((StreamExpression) param);
- tupleParams.put(name, evaluator);
- } else {
- TupleStream tupleStream = factory.constructStream((StreamExpression) param);
- tupleParams.put(name, tupleStream);
- }
+ stringParams.put(name, ((StreamExpressionValue)param).getValue());
+ } else if (factory.isEvaluator((StreamExpression) param)) {
+ StreamEvaluator evaluator = factory.constructEvaluator((StreamExpression) param);
+ evaluatorParams.put(name, evaluator);
+ } else if(factory.isStream((StreamExpression)param)) {
+ TupleStream tupleStream = factory.constructStream((StreamExpression) param);
+ streamParams.put(name, tupleStream);
+ }
+ else{
+ throw new IOException(String.format(Locale.ROOT,"Invalid expression %s - only string, evaluator, or stream named parameters are supported, but param %d is none of those",expression, name));
}
}
}
@@ -73,6 +82,26 @@ public class TupStream extends TupleStream implements Expressible {
private StreamExpression toExpression(StreamFactory factory, boolean includeStreams) throws IOException {
// function name
StreamExpression expression = new StreamExpression(factory.getFunctionName(this.getClass()));
+
+ // add string based params
+ for(Entry<String,String> param : stringParams.entrySet()){
+ expression.addParameter(new StreamExpressionNamedParameter(param.getKey(), param.getValue()));
+ }
+
+ // add evaluator based params
+ for(Entry<String,StreamEvaluator> param : evaluatorParams.entrySet()){
+ expression.addParameter(new StreamExpressionNamedParameter(param.getKey(), param.getValue().toExpression(factory)));
+ }
+
+ // add stream based params
+ for(Entry<String,TupleStream> param : streamParams.entrySet()){
+ if(includeStreams){
+ expression.addParameter(new StreamExpressionNamedParameter(param.getKey(), ((Expressible)param.getValue()).toExpression(factory)));
+ }
+ else{
+ expression.addParameter(new StreamExpressionNamedParameter(param.getKey(), "<stream>"));
+ }
+ }
return expression;
}
@@ -91,6 +120,15 @@ public class TupStream extends TupleStream implements Expressible {
public void setStreamContext(StreamContext context) {
this.streamContext = context;
+
+ // also set in evalators and streams
+ for(StreamEvaluator evaluator : evaluatorParams.values()){
+ evaluator.setStreamContext(context);
+ }
+
+ for(TupleStream stream : streamParams.values()){
+ stream.setStreamContext(context);
+ }
}
public List<TupleStream> children() {
@@ -101,59 +139,68 @@ public class TupStream extends TupleStream implements Expressible {
public Tuple read() throws IOException {
if(finished) {
- Map m = new HashMap();
+ Map<String,Object> m = new HashMap<>();
m.put("EOF", true);
return new Tuple(m);
} else {
finished = true;
- Map<String, Object> map = new HashMap();
- Set<Map.Entry<String, Object>> entries = tupleParams.entrySet();
-
- for (Map.Entry<String, Object> entry : entries) {
- String name = entry.getKey();
- Object o = entry.getValue();
- if (o instanceof TupleStream) {
- List<Tuple> tuples = new ArrayList();
- TupleStream tStream = (TupleStream) o;
- tStream.setStreamContext(streamContext);
- try {
- tStream.open();
- TUPLES:
- while (true) {
- Tuple tuple = tStream.read();
- if (tuple.EOF) {
- break TUPLES;
- } else {
- tuples.add(tuple);
- }
- }
- map.put(name, tuples);
- } finally {
- tStream.close();
+ Map<String, Object> values = new HashMap<>();
+
+ // add all string based params
+ // these could come from the context, or they will just be treated as straight strings
+ for(Entry<String,String> param : stringParams.entrySet()){
+ if(streamContext.getLets().containsKey(param.getValue())){
+ values.put(param.getKey(), streamContext.getLets().get(param.getValue()));
+ }
+ else{
+ values.put(param.getKey(), param.getValue());
+ }
+ }
+
+ // add all evaluators
+ for(Entry<String,StreamEvaluator> param : evaluatorParams.entrySet()){
+ values.put(param.getKey(), param.getValue().evaluateOverContext());
+ }
+
+ // Add all streams
+ for(Entry<String,TupleStream> param : streamParams.entrySet()){
+
+ try{
+ List<Tuple> streamTuples = new ArrayList<Tuple>();
+
+ // open the stream, closed in finally block
+ param.getValue().open();
+
+ // read all values from stream (memory expensive)
+ Tuple streamTuple = param.getValue().read();
+ while(!streamTuple.EOF){
+ streamTuples.add(streamTuple);
+ streamTuple = param.getValue().read();
}
- } else if ((o instanceof StreamEvaluator)) {
- Tuple eTuple = new Tuple(streamContext.getLets());
- StreamEvaluator evaluator = (StreamEvaluator) o;
- Object eo = evaluator.evaluate(eTuple);
- map.put(name, eo);
- } else {
- map.put(name, streamContext.getLets().get(o.toString()));
+
+ values.put(param.getKey(), streamTuples);
}
+ finally{
+ // safely close the stream
+ param.getValue().close();
+ }
}
- return new Tuple(map);
+
+ return new Tuple(values);
}
}
public void close() throws IOException {
+ // Nothing to do here
}
public void open() throws IOException {
-
+ // nothing to do here
}
/** Return the stream sort - ie, the order in which records are returned */
public StreamComparator getStreamSort(){
- return null;
+ return new SingleValueComparator();
}
public int getCost() {
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/460b3b36/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/expr/StreamFactory.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/expr/StreamFactory.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/expr/StreamFactory.java
index f03bf48..703acf4 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/expr/StreamFactory.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/expr/StreamFactory.java
@@ -377,6 +377,18 @@ public class StreamFactory implements Serializable {
throw new IOException(String.format(Locale.ROOT,"Invalid evaluator expression %s - function '%s' is unknown (not mapped to a valid StreamEvaluator)", expression, expression.getFunctionName()));
}
+ public boolean isStream(StreamExpression expression) throws IOException{
+ String function = expression.getFunctionName();
+ if(functionNames.containsKey(function)){
+ Class<? extends Expressible> clazz = functionNames.get(function);
+ if(Expressible.class.isAssignableFrom(clazz) && TupleStream.class.isAssignableFrom(clazz)){
+ return true;
+ }
+ }
+
+ return false;
+ }
+
public boolean isEvaluator(StreamExpression expression) throws IOException{
String function = expression.getFunctionName();
if(functionNames.containsKey(function)){
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/460b3b36/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/eval/AbsoluteValueEvaluatorTest.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/eval/AbsoluteValueEvaluatorTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/eval/AbsoluteValueEvaluatorTest.java
index ff2384c..370a3e8 100644
--- a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/eval/AbsoluteValueEvaluatorTest.java
+++ b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/eval/AbsoluteValueEvaluatorTest.java
@@ -23,7 +23,9 @@ import java.util.Map;
import org.apache.lucene.util.LuceneTestCase;
import org.apache.solr.client.solrj.io.Tuple;
import org.apache.solr.client.solrj.io.eval.AbsoluteValueEvaluator;
+import org.apache.solr.client.solrj.io.eval.AddEvaluator;
import org.apache.solr.client.solrj.io.eval.StreamEvaluator;
+import org.apache.solr.client.solrj.io.stream.StreamContext;
import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
import org.junit.Test;
@@ -38,7 +40,8 @@ public class AbsoluteValueEvaluatorTest extends LuceneTestCase {
super();
factory = new StreamFactory()
- .withFunctionName("abs", AbsoluteValueEvaluator.class);
+ .withFunctionName("abs", AbsoluteValueEvaluator.class)
+ .withFunctionName("add", AddEvaluator.class);
values = new HashMap<String,Object>();
}
@@ -65,6 +68,34 @@ public class AbsoluteValueEvaluatorTest extends LuceneTestCase {
Assert.assertTrue(result instanceof Double);
Assert.assertEquals(1.1D, result);
}
+
+ @Test
+ public void absoluteValueFromContext() throws Exception{
+ StreamEvaluator evaluator = factory.constructEvaluator("abs(a)");
+ StreamContext context = new StreamContext();
+ evaluator.setStreamContext(context);
+ Object result;
+
+ context.getLets().put("a", 1);
+ result = evaluator.evaluate(new Tuple());
+ Assert.assertTrue(result instanceof Long);
+ Assert.assertEquals(1L, result);
+
+ context.getLets().put("a", 1.1);
+ result = evaluator.evaluate(new Tuple());
+ Assert.assertTrue(result instanceof Double);
+ Assert.assertEquals(1.1D, result);
+
+ context.getLets().put("a", -1.1);
+ result = evaluator.evaluate(new Tuple());
+ Assert.assertTrue(result instanceof Double);
+ Assert.assertEquals(1.1D, result);
+
+ context.getLets().put("a", factory.constructEvaluator("add(4,-6,34,-56)"));
+ result = evaluator.evaluate(new Tuple());
+ Assert.assertTrue(result instanceof Long);
+ Assert.assertEquals(24L, result);
+ }
@Test(expected = IOException.class)
public void absNoField() throws Exception{