You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by dp...@apache.org on 2016/04/15 03:40:58 UTC

lucene-solr:master: SOLR-8962: Adds a Sort stream w/sort function name

Repository: lucene-solr
Updated Branches:
  refs/heads/master 7c098913e -> eb74d814b


SOLR-8962: Adds a Sort stream w/sort function name


Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/eb74d814
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/eb74d814
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/eb74d814

Branch: refs/heads/master
Commit: eb74d814bb760cfd2f7234183f2db3d4f09ec48b
Parents: 7c09891
Author: Dennis Gove <dp...@gmail.com>
Authored: Sun Apr 10 09:18:42 2016 -0400
Committer: Dennis Gove <dp...@gmail.com>
Committed: Thu Apr 14 21:39:26 2016 -0400

----------------------------------------------------------------------
 solr/CHANGES.txt                                |   3 +
 .../org/apache/solr/handler/StreamHandler.java  |  22 ++-
 .../solr/client/solrj/io/stream/SortStream.java | 173 +++++++++++++++++++
 .../solrj/io/stream/StreamExpressionTest.java   |  42 +++++
 4 files changed, 231 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/eb74d814/solr/CHANGES.txt
----------------------------------------------------------------------
diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
index e236c19..1e1e7d9 100644
--- a/solr/CHANGES.txt
+++ b/solr/CHANGES.txt
@@ -93,6 +93,9 @@ New Features
 
 * SOLR-8976: Add SolrJ support for REBALANCELEADERS Collections API (Anshum Gupta)
 
+* SOLR-8961: Add sort Streaming Expression. The expression takes a single input stream and a 
+  comparator and outputs tuples in stable order of the comparator. (Dennis Gove)
+
 Bug Fixes
 ----------------------
 

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/eb74d814/solr/core/src/java/org/apache/solr/handler/StreamHandler.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/handler/StreamHandler.java b/solr/core/src/java/org/apache/solr/handler/StreamHandler.java
index 226058e..5ddd312 100644
--- a/solr/core/src/java/org/apache/solr/handler/StreamHandler.java
+++ b/solr/core/src/java/org/apache/solr/handler/StreamHandler.java
@@ -95,8 +95,14 @@ public class StreamHandler extends RequestHandlerBase implements SolrCoreAware,
     }
 
      streamFactory
-       // streams
+       // source streams
       .withFunctionName("search", CloudSolrStream.class)
+      .withFunctionName("facet", FacetStream.class)
+      .withFunctionName("update", UpdateStream.class)
+      .withFunctionName("jdbc", JDBCStream.class)
+      .withFunctionName("topic", TopicStream.class)
+      
+      // decorator streams
       .withFunctionName("merge", MergeStream.class)
       .withFunctionName("unique", UniqueStream.class)
       .withFunctionName("top", RankStream.class)
@@ -109,17 +115,15 @@ public class StreamHandler extends RequestHandlerBase implements SolrCoreAware,
       .withFunctionName("leftOuterJoin", LeftOuterJoinStream.class) 
       .withFunctionName("hashJoin", HashJoinStream.class)
       .withFunctionName("outerHashJoin", OuterHashJoinStream.class)
-      .withFunctionName("facet", FacetStream.class)
-      .withFunctionName("update", UpdateStream.class)
-      .withFunctionName("jdbc", JDBCStream.class)
       .withFunctionName("intersect", IntersectStream.class)
       .withFunctionName("complement", ComplementStream.class)
-         .withFunctionName("daemon", DaemonStream.class)
-         .withFunctionName("topic", TopicStream.class)
-         .withFunctionName("shortestPath", ShortestPathStream.class)
-
+      .withFunctionName("daemon", DaemonStream.class)
+      .withFunctionName("sort", SortStream.class)
+      
+      // graph streams
+      .withFunctionName("shortestPath", ShortestPathStream.class)
 
-    // metrics
+      // metrics
       .withFunctionName("min", MinMetric.class)
       .withFunctionName("max", MaxMetric.class)
       .withFunctionName("avg", MeanMetric.class)

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/eb74d814/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/SortStream.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/SortStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/SortStream.java
new file mode 100644
index 0000000..d9a8526
--- /dev/null
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/SortStream.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.client.solrj.io.stream;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Locale;
+
+import org.apache.solr.client.solrj.io.Tuple;
+import org.apache.solr.client.solrj.io.comp.FieldComparator;
+import org.apache.solr.client.solrj.io.comp.StreamComparator;
+import org.apache.solr.client.solrj.io.stream.expr.Expressible;
+import org.apache.solr.client.solrj.io.stream.expr.StreamExpression;
+import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionNamedParameter;
+import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionValue;
+import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
+
+
+/**
+ * The SortStream emits a stream of Tuples sorted by a Comparator.
+ **/
+
+public class SortStream extends TupleStream implements Expressible {
+
+  private static final long serialVersionUID = 1;
+
+  private TupleStream stream;
+  private StreamComparator comparator;
+  private Worker worker;
+
+  public SortStream(TupleStream stream, StreamComparator comp) throws IOException {
+    init(stream,comp);
+  }
+  
+  public SortStream(StreamExpression expression,StreamFactory factory) throws IOException {
+    // grab all parameters out
+    List<StreamExpression> streamExpressions = factory.getExpressionOperandsRepresentingTypes(expression, Expressible.class, TupleStream.class);
+    StreamExpressionNamedParameter byExpression = factory.getNamedOperand(expression, "by");
+    
+    // validate expression contains only what we want.
+    if(expression.getParameters().size() != streamExpressions.size() + 1){
+      throw new IOException(String.format(Locale.ROOT,"Invalid expression %s - unknown operands found", expression));
+    }
+    
+    if(1 != streamExpressions.size()){
+      throw new IOException(String.format(Locale.ROOT,"Invalid expression %s - expecting a single stream but found %d",expression, streamExpressions.size()));
+    }
+    
+    if(null == byExpression || !(byExpression.getParameter() instanceof StreamExpressionValue)){
+      throw new IOException(String.format(Locale.ROOT,"Invalid expression %s - expecting single 'by' parameter listing fields to sort over but didn't find one",expression));
+    }
+    
+    init(
+          factory.constructStream(streamExpressions.get(0)),
+          factory.constructComparator(((StreamExpressionValue)byExpression.getParameter()).getValue(), FieldComparator.class)
+        );
+  }
+  
+  private void init(TupleStream stream, StreamComparator comp) throws IOException{
+    this.stream = stream;
+    this.comparator = comp;
+    
+    // standard java modified merge sort
+    worker = new Worker() {
+
+      private LinkedList<Tuple> tuples = new LinkedList<Tuple>();
+      private Tuple eofTuple;
+      
+      public void readStream(TupleStream stream) throws IOException {
+        Tuple tuple = stream.read();
+        while(!tuple.EOF){
+          tuples.add(tuple);
+          tuple = stream.read();
+        }
+        eofTuple = tuple;
+      }
+      
+      public void sort() {
+        tuples.sort(comparator);
+      }
+      
+      public Tuple read() {
+        if(tuples.isEmpty()){
+          return eofTuple;
+        }
+        return tuples.removeFirst();
+      }
+    };
+    
+  }
+
+  @Override
+  public StreamExpression toExpression(StreamFactory factory) throws IOException {    
+    // function name
+    StreamExpression expression = new StreamExpression(factory.getFunctionName(this.getClass()));
+    
+    // streams
+    if(stream instanceof Expressible){
+      expression.addParameter(((Expressible)stream).toExpression(factory));
+    }
+    else{
+      throw new IOException("This SortStream contains a non-expressible TupleStream - it cannot be converted to an expression");
+    }
+    
+    // by
+    if(comparator instanceof Expressible){
+      expression.addParameter(new StreamExpressionNamedParameter("by",((Expressible)comparator).toExpression(factory)));
+    }
+    else{
+      throw new IOException("This SortStream contains a non-expressible equalitor - it cannot be converted to an expression");
+    }
+    
+    return expression;   
+  }
+    
+  public void setStreamContext(StreamContext context) {
+    this.stream.setStreamContext(context);
+  }
+
+  public List<TupleStream> children() {
+    List<TupleStream> l =  new ArrayList<TupleStream>();
+    l.add(stream);
+    return l;
+  }
+
+  public void open() throws IOException {
+    stream.open();
+
+    worker.readStream(stream);
+    worker.sort();
+  }
+
+  public void close() throws IOException {
+    stream.close();
+  }
+
+  public Tuple read() throws IOException {
+    // return next from sorted order
+    return worker.read();    
+  }
+
+  /** Return the stream sort - ie, the order in which records are returned */
+  public StreamComparator getStreamSort(){
+    return comparator;
+  }
+  
+  public int getCost() {
+    return 0;
+  }
+
+  private interface Worker {
+    public void readStream(TupleStream stream) throws IOException;
+    public void sort();
+    public Tuple read();
+  }
+  
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/eb74d814/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java
index e7f57c1..9ae6761 100644
--- a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java
+++ b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java
@@ -134,6 +134,7 @@ public class StreamExpressionTest extends AbstractFullDistribZkTestBase {
     testRankStream();
     testReducerStream();
     testUniqueStream();
+    testSortStream();
     testRollupStream();
     testStatsStream();
     testNulls();
@@ -306,6 +307,47 @@ public class StreamExpressionTest extends AbstractFullDistribZkTestBase {
     commit();
   }
 
+  private void testSortStream() throws Exception {
+
+    indexr(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "0");
+    indexr(id, "2", "a_s", "hello2", "a_i", "2", "a_f", "0");
+    indexr(id, "3", "a_s", "hello3", "a_i", "3", "a_f", "3");
+    indexr(id, "4", "a_s", "hello4", "a_i", "4", "a_f", "4");
+    indexr(id, "1", "a_s", "hello1", "a_i", "1", "a_f", "1");
+    indexr(id, "5", "a_s", "hello1", "a_i", "1", "a_f", "2");
+    commit();
+
+    StreamExpression expression;
+    TupleStream stream;
+    List<Tuple> tuples;
+    
+    StreamFactory factory = new StreamFactory()
+      .withCollectionZkHost("collection1", zkServer.getZkAddress())
+      .withFunctionName("search", CloudSolrStream.class)
+      .withFunctionName("sort", SortStream.class);
+    
+    // Basic test
+    stream = factory.constructStream("sort(search(collection1, q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc\"), by=\"a_i asc\")");
+    tuples = getTuples(stream);
+    assert(tuples.size() == 6);
+    assertOrder(tuples, 0,1,5,2,3,4);
+
+    // Basic test desc
+    stream = factory.constructStream("sort(search(collection1, q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc\"), by=\"a_i desc\")");
+    tuples = getTuples(stream);
+    assert(tuples.size() == 6);
+    assertOrder(tuples, 4,3,2,1,5,0);
+    
+    // Basic w/multi comp
+    stream = factory.constructStream("sort(search(collection1, q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc\"), by=\"a_i asc, a_f desc\")");
+    tuples = getTuples(stream);
+    assert(tuples.size() == 6);
+    assertOrder(tuples, 0,5,1,2,3,4);
+        
+    del("*:*");
+    commit();
+  }
+
 
   private void testNulls() throws Exception {