You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by dp...@apache.org on 2017/04/29 01:50:33 UTC

lucene-solr:master: SOLR-10559: Updates TupStream and enhances evaluators to work over values in the SteamContext

Repository: lucene-solr
Updated Branches:
  refs/heads/master 7f6f68c7f -> 460b3b36e


SOLR-10559: Updates TupStream and enhances evaluators to work over values in the SteamContext


Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/460b3b36
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/460b3b36
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/460b3b36

Branch: refs/heads/master
Commit: 460b3b36e9544b4be75cf1d25c89d8bde8ec5c74
Parents: 7f6f68c
Author: Dennis Gove <dp...@gmail.com>
Authored: Fri Apr 28 21:45:56 2017 -0400
Committer: Dennis Gove <dp...@gmail.com>
Committed: Fri Apr 28 21:45:56 2017 -0400

----------------------------------------------------------------------
 .../org/apache/solr/client/solrj/io/Tuple.java  |   4 +
 .../solrj/io/comp/SingleValueComparator.java    |  65 +++++++++
 .../client/solrj/io/eval/ComplexEvaluator.java  |   7 +
 .../client/solrj/io/eval/FieldEvaluator.java    |  17 ++-
 .../client/solrj/io/eval/SimpleEvaluator.java   |   3 +
 .../client/solrj/io/eval/StreamEvaluator.java   |  26 +++-
 .../solr/client/solrj/io/stream/TupStream.java  | 145 ++++++++++++-------
 .../solrj/io/stream/expr/StreamFactory.java     |  12 ++
 .../stream/eval/AbsoluteValueEvaluatorTest.java |  33 ++++-
 9 files changed, 260 insertions(+), 52 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/460b3b36/solr/solrj/src/java/org/apache/solr/client/solrj/io/Tuple.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/Tuple.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/Tuple.java
index fdf44c9..d82c864 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/Tuple.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/Tuple.java
@@ -48,6 +48,10 @@ public class Tuple implements Cloneable, MapWriter {
   public List<String> fieldNames;
   public Map<String, String> fieldLabels;
 
+  public Tuple(){
+    // just an empty tuple
+  }
+  
   public Tuple(Map fields) {
     if(fields.containsKey("EOF")) {
       EOF = true;

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/460b3b36/solr/solrj/src/java/org/apache/solr/client/solrj/io/comp/SingleValueComparator.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/comp/SingleValueComparator.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/comp/SingleValueComparator.java
new file mode 100644
index 0000000..c429072
--- /dev/null
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/comp/SingleValueComparator.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.client.solrj.io.comp;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.UUID;
+
+import org.apache.solr.client.solrj.io.Tuple;
+import org.apache.solr.client.solrj.io.stream.expr.Explanation;
+import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionParameter;
+import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
+
+/**
+ *  An equality Comparator to be used when a stream will only ever return a single field,
+ *  ie, it has no sorted order
+ **/
+public class SingleValueComparator implements StreamComparator {
+
+  private static final long serialVersionUID = 1;
+  private UUID comparatorNodeId = UUID.randomUUID();
+    
+  public StreamExpressionParameter toExpression(StreamFactory factory){
+    return null;
+  }
+
+  @Override
+  public Explanation toExplanation(StreamFactory factory) throws IOException {
+    return null;
+  }
+  
+  public int compare(Tuple leftTuple, Tuple rightTuple) {
+    return -1; // whatever, just keep everything in same order 
+  }
+  
+  @Override
+  public boolean isDerivedFrom(StreamComparator base){
+    // this doesn't sort, so everything else is a match
+    return true;
+  }
+  
+  @Override
+  public SingleValueComparator copyAliased(Map<String,String> aliases){
+    return this;
+  }
+  
+  @Override
+  public StreamComparator append(StreamComparator other){
+    return other;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/460b3b36/solr/solrj/src/java/org/apache/solr/client/solrj/io/eval/ComplexEvaluator.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/eval/ComplexEvaluator.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/eval/ComplexEvaluator.java
index 59a4653..ea4c88c 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/eval/ComplexEvaluator.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/eval/ComplexEvaluator.java
@@ -101,5 +101,12 @@ public abstract class ComplexEvaluator implements StreamEvaluator {
   
   public void setStreamContext(StreamContext context) {
     this.streamContext = context;
+    
+    for(StreamEvaluator subEvaluator : subEvaluators){
+      subEvaluator.setStreamContext(context);
+    }
+  }
+  public StreamContext getStreamContext(){
+    return streamContext;
   }
 }

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/460b3b36/solr/solrj/src/java/org/apache/solr/client/solrj/io/eval/FieldEvaluator.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/eval/FieldEvaluator.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/eval/FieldEvaluator.java
index 3251498..501e9d5 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/eval/FieldEvaluator.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/eval/FieldEvaluator.java
@@ -44,9 +44,24 @@ public class FieldEvaluator extends SimpleEvaluator {
   }
   
   @Override
-  public Object evaluate(Tuple tuple) {
+  public Object evaluate(Tuple tuple) throws IOException {
     Object value = tuple.get(fieldName);
     
+    // This is somewhat radical.
+    // Here, we allow for the use of the context to provide alternative values
+    // when they are not available in the provided tuple. This means that all
+    // evaluators can evaluate over both a stream's tuple and the context, and
+    // can even evaluate over fields from both of them in the same evaluation
+    if(null == value && null != getStreamContext()){
+      value = getStreamContext().getLets().get(fieldName);
+      
+      // If what's contained in the context is itself an evaluator then
+      // we need to evaluate it
+      if(value instanceof StreamEvaluator){
+        value = ((StreamEvaluator)value).evaluate(tuple);
+      }
+    }
+    
     // if we have an array then convert to an ArrayList
     // if we have an iterable that is not a list then convert to ArrayList
     // lists are good to go

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/460b3b36/solr/solrj/src/java/org/apache/solr/client/solrj/io/eval/SimpleEvaluator.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/eval/SimpleEvaluator.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/eval/SimpleEvaluator.java
index 5ee1715..4a095f8 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/eval/SimpleEvaluator.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/eval/SimpleEvaluator.java
@@ -32,5 +32,8 @@ public abstract class SimpleEvaluator implements StreamEvaluator {
   public void setStreamContext(StreamContext streamContext) {
     this.streamContext = streamContext;
   }
+  public StreamContext getStreamContext(){
+    return streamContext;
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/460b3b36/solr/solrj/src/java/org/apache/solr/client/solrj/io/eval/StreamEvaluator.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/eval/StreamEvaluator.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/eval/StreamEvaluator.java
index 1774c46..e82d5d3 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/eval/StreamEvaluator.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/eval/StreamEvaluator.java
@@ -27,6 +27,30 @@ import org.apache.solr.client.solrj.io.stream.StreamContext;
 import org.apache.solr.client.solrj.io.stream.expr.Expressible;
 
 public interface StreamEvaluator extends Expressible, Serializable {
-  Object evaluate(final Tuple tuple) throws IOException;
   void setStreamContext(StreamContext streamContext);
+  StreamContext getStreamContext();
+  
+  Object evaluate(final Tuple tuple) throws IOException;
+  
+  /**
+   * Execute the evaluator over lets stored within the StreamContext. This allows 
+   * evaluators to be executed over values calculated elsewhere in the pipeline
+   * and stored in the {@link StreamContext#getLets() streamContext.lets}
+   * 
+   * Default implementation just creates a tuple out of all values in the context 
+   * and passes that to {@link StreamEvaluator#evaluate(Tuple)}.
+   * 
+   * @return Evaluated value
+   * @throws IOException throw on error during evaluation
+   */
+  default Object evaluateOverContext() throws IOException{
+    StreamContext context = getStreamContext();
+    if(null != context){
+      Tuple contextTuple = new Tuple(context.getLets());
+      return evaluate(contextTuple);
+    }
+    
+    return null;
+  }
+  
 }

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/460b3b36/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/TupStream.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/TupStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/TupStream.java
index 57c7b76..8a71ae6 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/TupStream.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/TupStream.java
@@ -20,10 +20,12 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
+import java.util.Locale;
 import java.util.Map;
-import java.util.Set;
+import java.util.Map.Entry;
 
 import org.apache.solr.client.solrj.io.Tuple;
+import org.apache.solr.client.solrj.io.comp.SingleValueComparator;
 import org.apache.solr.client.solrj.io.comp.StreamComparator;
 import org.apache.solr.client.solrj.io.eval.StreamEvaluator;
 import org.apache.solr.client.solrj.io.stream.expr.Explanation;
@@ -40,27 +42,34 @@ public class TupStream extends TupleStream implements Expressible {
 
   private static final long serialVersionUID = 1;
   private StreamContext streamContext;
-  private Map tupleParams = new HashMap();
+  
+  private Map<String,String> stringParams = new HashMap<>();
+  private Map<String,StreamEvaluator> evaluatorParams = new HashMap<>();
+  private Map<String,TupleStream> streamParams = new HashMap<>();
+  
   private boolean finished;
 
   public TupStream(StreamExpression expression, StreamFactory factory) throws IOException {
 
     List<StreamExpressionNamedParameter> namedParams = factory.getNamedOperands(expression);
     //Get all the named params
-    for(StreamExpressionParameter np : namedParams) {
-      String name = ((StreamExpressionNamedParameter)np).getName();
-      StreamExpressionParameter param = ((StreamExpressionNamedParameter)np).getParameter();
+    for(StreamExpressionNamedParameter np : namedParams) {
+      String name = np.getName();
+      StreamExpressionParameter param = np.getParameter();
 
+      // we're going to split these up here so we only make the choice once
+      // order of these in read() doesn't matter
       if(param instanceof StreamExpressionValue) {
-        tupleParams.put(name, ((StreamExpressionValue)param).getValue());
-      } else {
-        if (factory.isEvaluator((StreamExpression) param)) {
-          StreamEvaluator evaluator = factory.constructEvaluator((StreamExpression) param);
-          tupleParams.put(name, evaluator);
-        } else {
-          TupleStream tupleStream = factory.constructStream((StreamExpression) param);
-          tupleParams.put(name, tupleStream);
-        }
+        stringParams.put(name, ((StreamExpressionValue)param).getValue());
+      } else if (factory.isEvaluator((StreamExpression) param)) {
+        StreamEvaluator evaluator = factory.constructEvaluator((StreamExpression) param);
+        evaluatorParams.put(name, evaluator);
+      } else if(factory.isStream((StreamExpression)param)) {
+        TupleStream tupleStream = factory.constructStream((StreamExpression) param);
+        streamParams.put(name, tupleStream);
+      }
+      else{
+        throw new IOException(String.format(Locale.ROOT,"Invalid expression %s - only string, evaluator, or stream named parameters are supported, but param %d is none of those",expression, name));
       }
     }
   }
@@ -73,6 +82,26 @@ public class TupStream extends TupleStream implements Expressible {
   private StreamExpression toExpression(StreamFactory factory, boolean includeStreams) throws IOException {
     // function name
     StreamExpression expression = new StreamExpression(factory.getFunctionName(this.getClass()));
+    
+    // add string based params
+    for(Entry<String,String> param : stringParams.entrySet()){
+      expression.addParameter(new StreamExpressionNamedParameter(param.getKey(), param.getValue()));
+    }
+    
+    // add evaluator based params
+    for(Entry<String,StreamEvaluator> param : evaluatorParams.entrySet()){
+      expression.addParameter(new StreamExpressionNamedParameter(param.getKey(), param.getValue().toExpression(factory)));
+    }
+
+    // add stream based params
+    for(Entry<String,TupleStream> param : streamParams.entrySet()){
+      if(includeStreams){
+        expression.addParameter(new StreamExpressionNamedParameter(param.getKey(), ((Expressible)param.getValue()).toExpression(factory)));
+      }
+      else{
+        expression.addParameter(new StreamExpressionNamedParameter(param.getKey(), "<stream>"));
+      }
+    }
 
     return expression;
   }
@@ -91,6 +120,15 @@ public class TupStream extends TupleStream implements Expressible {
 
   public void setStreamContext(StreamContext context) {
     this.streamContext = context;
+    
+    // also set in evalators and streams
+    for(StreamEvaluator evaluator : evaluatorParams.values()){
+      evaluator.setStreamContext(context);
+    }
+    
+    for(TupleStream stream : streamParams.values()){
+      stream.setStreamContext(context);
+    }
   }
 
   public List<TupleStream> children() {
@@ -101,59 +139,68 @@ public class TupStream extends TupleStream implements Expressible {
   public Tuple read() throws IOException {
 
     if(finished) {
-      Map m = new HashMap();
+      Map<String,Object> m = new HashMap<>();
       m.put("EOF", true);
       return new Tuple(m);
     } else {
       finished = true;
-      Map<String, Object> map = new HashMap();
-      Set<Map.Entry<String, Object>> entries = tupleParams.entrySet();
-
-      for (Map.Entry<String, Object> entry : entries) {
-        String name = entry.getKey();
-        Object o = entry.getValue();
-        if (o instanceof TupleStream) {
-          List<Tuple> tuples = new ArrayList();
-          TupleStream tStream = (TupleStream) o;
-          tStream.setStreamContext(streamContext);
-          try {
-            tStream.open();
-            TUPLES:
-            while (true) {
-              Tuple tuple = tStream.read();
-              if (tuple.EOF) {
-                break TUPLES;
-              } else {
-                tuples.add(tuple);
-              }
-            }
-            map.put(name, tuples);
-          } finally {
-            tStream.close();
+      Map<String, Object> values = new HashMap<>();
+      
+      // add all string based params
+      // these could come from the context, or they will just be treated as straight strings
+      for(Entry<String,String> param : stringParams.entrySet()){
+        if(streamContext.getLets().containsKey(param.getValue())){
+          values.put(param.getKey(), streamContext.getLets().get(param.getValue()));
+        }
+        else{
+          values.put(param.getKey(), param.getValue());
+        }
+      }
+      
+      // add all evaluators
+      for(Entry<String,StreamEvaluator> param : evaluatorParams.entrySet()){
+        values.put(param.getKey(), param.getValue().evaluateOverContext());
+      }
+      
+      // Add all streams
+      for(Entry<String,TupleStream> param : streamParams.entrySet()){
+        
+        try{
+          List<Tuple> streamTuples = new ArrayList<Tuple>();
+          
+          // open the stream, closed in finally block
+          param.getValue().open();
+          
+          // read all values from stream (memory expensive)
+          Tuple streamTuple = param.getValue().read();
+          while(!streamTuple.EOF){
+            streamTuples.add(streamTuple);
+            streamTuple = param.getValue().read();
           }
-        } else if ((o instanceof StreamEvaluator))  {
-          Tuple eTuple = new Tuple(streamContext.getLets());
-          StreamEvaluator evaluator = (StreamEvaluator) o;
-          Object eo = evaluator.evaluate(eTuple);
-          map.put(name, eo);
-        } else {
-          map.put(name, streamContext.getLets().get(o.toString()));
+          
+          values.put(param.getKey(), streamTuples);
         }
+        finally{
+          // safely close the stream
+          param.getValue().close();
+        }        
       }
-      return new Tuple(map);
+
+      return new Tuple(values);
     }
   }
 
   public void close() throws IOException {
+    // Nothing to do here
   }
 
   public void open() throws IOException {
-
+    // nothing to do here
   }
 
   /** Return the stream sort - ie, the order in which records are returned */
   public StreamComparator getStreamSort(){
-    return null;
+    return new SingleValueComparator();
   }
 
   public int getCost() {

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/460b3b36/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/expr/StreamFactory.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/expr/StreamFactory.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/expr/StreamFactory.java
index f03bf48..703acf4 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/expr/StreamFactory.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/expr/StreamFactory.java
@@ -377,6 +377,18 @@ public class StreamFactory implements Serializable {
     throw new IOException(String.format(Locale.ROOT,"Invalid evaluator expression %s - function '%s' is unknown (not mapped to a valid StreamEvaluator)", expression, expression.getFunctionName()));
   }
 
+  public boolean isStream(StreamExpression expression) throws IOException{
+    String function = expression.getFunctionName();
+    if(functionNames.containsKey(function)){
+      Class<? extends Expressible> clazz = functionNames.get(function);
+      if(Expressible.class.isAssignableFrom(clazz) && TupleStream.class.isAssignableFrom(clazz)){
+        return true;
+      }
+    }
+
+    return false;
+  }
+  
   public boolean isEvaluator(StreamExpression expression) throws IOException{
     String function = expression.getFunctionName();
     if(functionNames.containsKey(function)){

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/460b3b36/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/eval/AbsoluteValueEvaluatorTest.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/eval/AbsoluteValueEvaluatorTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/eval/AbsoluteValueEvaluatorTest.java
index ff2384c..370a3e8 100644
--- a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/eval/AbsoluteValueEvaluatorTest.java
+++ b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/eval/AbsoluteValueEvaluatorTest.java
@@ -23,7 +23,9 @@ import java.util.Map;
 import org.apache.lucene.util.LuceneTestCase;
 import org.apache.solr.client.solrj.io.Tuple;
 import org.apache.solr.client.solrj.io.eval.AbsoluteValueEvaluator;
+import org.apache.solr.client.solrj.io.eval.AddEvaluator;
 import org.apache.solr.client.solrj.io.eval.StreamEvaluator;
+import org.apache.solr.client.solrj.io.stream.StreamContext;
 import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
 import org.junit.Test;
 
@@ -38,7 +40,8 @@ public class AbsoluteValueEvaluatorTest extends LuceneTestCase {
     super();
     
     factory = new StreamFactory()
-      .withFunctionName("abs", AbsoluteValueEvaluator.class);
+      .withFunctionName("abs", AbsoluteValueEvaluator.class)
+      .withFunctionName("add", AddEvaluator.class);
     values = new HashMap<String,Object>();
   }
     
@@ -65,6 +68,34 @@ public class AbsoluteValueEvaluatorTest extends LuceneTestCase {
     Assert.assertTrue(result instanceof Double);
     Assert.assertEquals(1.1D, result);
   }
+  
+  @Test
+  public void absoluteValueFromContext() throws Exception{
+    StreamEvaluator evaluator = factory.constructEvaluator("abs(a)");
+    StreamContext context = new StreamContext();
+    evaluator.setStreamContext(context);
+    Object result;
+    
+    context.getLets().put("a", 1);
+    result = evaluator.evaluate(new Tuple());
+    Assert.assertTrue(result instanceof Long);
+    Assert.assertEquals(1L, result);
+    
+    context.getLets().put("a", 1.1);
+    result = evaluator.evaluate(new Tuple());
+    Assert.assertTrue(result instanceof Double);
+    Assert.assertEquals(1.1D, result);
+    
+    context.getLets().put("a", -1.1);
+    result = evaluator.evaluate(new Tuple());
+    Assert.assertTrue(result instanceof Double);
+    Assert.assertEquals(1.1D, result);
+    
+    context.getLets().put("a", factory.constructEvaluator("add(4,-6,34,-56)"));
+    result = evaluator.evaluate(new Tuple());
+    Assert.assertTrue(result instanceof Long);
+    Assert.assertEquals(24L, result);
+  }
 
   @Test(expected = IOException.class)
   public void absNoField() throws Exception{