You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by dp...@apache.org on 2016/04/15 03:40:58 UTC
lucene-solr:master: SOLR-8962: Adds a Sort stream w/sort function name
Repository: lucene-solr
Updated Branches:
refs/heads/master 7c098913e -> eb74d814b
SOLR-8962: Adds a Sort stream w/sort function name
Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/eb74d814
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/eb74d814
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/eb74d814
Branch: refs/heads/master
Commit: eb74d814bb760cfd2f7234183f2db3d4f09ec48b
Parents: 7c09891
Author: Dennis Gove <dp...@gmail.com>
Authored: Sun Apr 10 09:18:42 2016 -0400
Committer: Dennis Gove <dp...@gmail.com>
Committed: Thu Apr 14 21:39:26 2016 -0400
----------------------------------------------------------------------
solr/CHANGES.txt | 3 +
.../org/apache/solr/handler/StreamHandler.java | 22 ++-
.../solr/client/solrj/io/stream/SortStream.java | 173 +++++++++++++++++++
.../solrj/io/stream/StreamExpressionTest.java | 42 +++++
4 files changed, 231 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/eb74d814/solr/CHANGES.txt
----------------------------------------------------------------------
diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
index e236c19..1e1e7d9 100644
--- a/solr/CHANGES.txt
+++ b/solr/CHANGES.txt
@@ -93,6 +93,9 @@ New Features
* SOLR-8976: Add SolrJ support for REBALANCELEADERS Collections API (Anshum Gupta)
+* SOLR-8961: Add sort Streaming Expression. The expression takes a single input stream and a
+ comparator and outputs tuples in stable order of the comparator. (Dennis Gove)
+
Bug Fixes
----------------------
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/eb74d814/solr/core/src/java/org/apache/solr/handler/StreamHandler.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/handler/StreamHandler.java b/solr/core/src/java/org/apache/solr/handler/StreamHandler.java
index 226058e..5ddd312 100644
--- a/solr/core/src/java/org/apache/solr/handler/StreamHandler.java
+++ b/solr/core/src/java/org/apache/solr/handler/StreamHandler.java
@@ -95,8 +95,14 @@ public class StreamHandler extends RequestHandlerBase implements SolrCoreAware,
}
streamFactory
- // streams
+ // source streams
.withFunctionName("search", CloudSolrStream.class)
+ .withFunctionName("facet", FacetStream.class)
+ .withFunctionName("update", UpdateStream.class)
+ .withFunctionName("jdbc", JDBCStream.class)
+ .withFunctionName("topic", TopicStream.class)
+
+ // decorator streams
.withFunctionName("merge", MergeStream.class)
.withFunctionName("unique", UniqueStream.class)
.withFunctionName("top", RankStream.class)
@@ -109,17 +115,15 @@ public class StreamHandler extends RequestHandlerBase implements SolrCoreAware,
.withFunctionName("leftOuterJoin", LeftOuterJoinStream.class)
.withFunctionName("hashJoin", HashJoinStream.class)
.withFunctionName("outerHashJoin", OuterHashJoinStream.class)
- .withFunctionName("facet", FacetStream.class)
- .withFunctionName("update", UpdateStream.class)
- .withFunctionName("jdbc", JDBCStream.class)
.withFunctionName("intersect", IntersectStream.class)
.withFunctionName("complement", ComplementStream.class)
- .withFunctionName("daemon", DaemonStream.class)
- .withFunctionName("topic", TopicStream.class)
- .withFunctionName("shortestPath", ShortestPathStream.class)
-
+ .withFunctionName("daemon", DaemonStream.class)
+ .withFunctionName("sort", SortStream.class)
+
+ // graph streams
+ .withFunctionName("shortestPath", ShortestPathStream.class)
- // metrics
+ // metrics
.withFunctionName("min", MinMetric.class)
.withFunctionName("max", MaxMetric.class)
.withFunctionName("avg", MeanMetric.class)
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/eb74d814/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/SortStream.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/SortStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/SortStream.java
new file mode 100644
index 0000000..d9a8526
--- /dev/null
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/SortStream.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.client.solrj.io.stream;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Locale;
+
+import org.apache.solr.client.solrj.io.Tuple;
+import org.apache.solr.client.solrj.io.comp.FieldComparator;
+import org.apache.solr.client.solrj.io.comp.StreamComparator;
+import org.apache.solr.client.solrj.io.stream.expr.Expressible;
+import org.apache.solr.client.solrj.io.stream.expr.StreamExpression;
+import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionNamedParameter;
+import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionValue;
+import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
+
+
+/**
+ * The SortStream emits a stream of Tuples sorted by a Comparator.
+ **/
+
+public class SortStream extends TupleStream implements Expressible {
+
+ private static final long serialVersionUID = 1;
+
+ private TupleStream stream;
+ private StreamComparator comparator;
+ private Worker worker;
+
+ public SortStream(TupleStream stream, StreamComparator comp) throws IOException {
+ init(stream,comp);
+ }
+
+ public SortStream(StreamExpression expression,StreamFactory factory) throws IOException {
+ // grab all parameters out
+ List<StreamExpression> streamExpressions = factory.getExpressionOperandsRepresentingTypes(expression, Expressible.class, TupleStream.class);
+ StreamExpressionNamedParameter byExpression = factory.getNamedOperand(expression, "by");
+
+ // validate expression contains only what we want.
+ if(expression.getParameters().size() != streamExpressions.size() + 1){
+ throw new IOException(String.format(Locale.ROOT,"Invalid expression %s - unknown operands found", expression));
+ }
+
+ if(1 != streamExpressions.size()){
+ throw new IOException(String.format(Locale.ROOT,"Invalid expression %s - expecting a single stream but found %d",expression, streamExpressions.size()));
+ }
+
+ if(null == byExpression || !(byExpression.getParameter() instanceof StreamExpressionValue)){
+ throw new IOException(String.format(Locale.ROOT,"Invalid expression %s - expecting single 'by' parameter listing fields to sort over but didn't find one",expression));
+ }
+
+ init(
+ factory.constructStream(streamExpressions.get(0)),
+ factory.constructComparator(((StreamExpressionValue)byExpression.getParameter()).getValue(), FieldComparator.class)
+ );
+ }
+
+ private void init(TupleStream stream, StreamComparator comp) throws IOException{
+ this.stream = stream;
+ this.comparator = comp;
+
+ // standard java modified merge sort
+ worker = new Worker() {
+
+ private LinkedList<Tuple> tuples = new LinkedList<Tuple>();
+ private Tuple eofTuple;
+
+ public void readStream(TupleStream stream) throws IOException {
+ Tuple tuple = stream.read();
+ while(!tuple.EOF){
+ tuples.add(tuple);
+ tuple = stream.read();
+ }
+ eofTuple = tuple;
+ }
+
+ public void sort() {
+ tuples.sort(comparator);
+ }
+
+ public Tuple read() {
+ if(tuples.isEmpty()){
+ return eofTuple;
+ }
+ return tuples.removeFirst();
+ }
+ };
+
+ }
+
+ @Override
+ public StreamExpression toExpression(StreamFactory factory) throws IOException {
+ // function name
+ StreamExpression expression = new StreamExpression(factory.getFunctionName(this.getClass()));
+
+ // streams
+ if(stream instanceof Expressible){
+ expression.addParameter(((Expressible)stream).toExpression(factory));
+ }
+ else{
+ throw new IOException("This SortStream contains a non-expressible TupleStream - it cannot be converted to an expression");
+ }
+
+ // by
+ if(comparator instanceof Expressible){
+ expression.addParameter(new StreamExpressionNamedParameter("by",((Expressible)comparator).toExpression(factory)));
+ }
+ else{
+ throw new IOException("This SortStream contains a non-expressible equalitor - it cannot be converted to an expression");
+ }
+
+ return expression;
+ }
+
+ public void setStreamContext(StreamContext context) {
+ this.stream.setStreamContext(context);
+ }
+
+ public List<TupleStream> children() {
+ List<TupleStream> l = new ArrayList<TupleStream>();
+ l.add(stream);
+ return l;
+ }
+
+ public void open() throws IOException {
+ stream.open();
+
+ worker.readStream(stream);
+ worker.sort();
+ }
+
+ public void close() throws IOException {
+ stream.close();
+ }
+
+ public Tuple read() throws IOException {
+ // return next from sorted order
+ return worker.read();
+ }
+
+ /** Return the stream sort - ie, the order in which records are returned */
+ public StreamComparator getStreamSort(){
+ return comparator;
+ }
+
+ public int getCost() {
+ return 0;
+ }
+
+ private interface Worker {
+ public void readStream(TupleStream stream) throws IOException;
+ public void sort();
+ public Tuple read();
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/eb74d814/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java
index e7f57c1..9ae6761 100644
--- a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java
+++ b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java
@@ -134,6 +134,7 @@ public class StreamExpressionTest extends AbstractFullDistribZkTestBase {
testRankStream();
testReducerStream();
testUniqueStream();
+ testSortStream();
testRollupStream();
testStatsStream();
testNulls();
@@ -306,6 +307,47 @@ public class StreamExpressionTest extends AbstractFullDistribZkTestBase {
commit();
}
+ private void testSortStream() throws Exception {
+
+ indexr(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "0");
+ indexr(id, "2", "a_s", "hello2", "a_i", "2", "a_f", "0");
+ indexr(id, "3", "a_s", "hello3", "a_i", "3", "a_f", "3");
+ indexr(id, "4", "a_s", "hello4", "a_i", "4", "a_f", "4");
+ indexr(id, "1", "a_s", "hello1", "a_i", "1", "a_f", "1");
+ indexr(id, "5", "a_s", "hello1", "a_i", "1", "a_f", "2");
+ commit();
+
+ StreamExpression expression;
+ TupleStream stream;
+ List<Tuple> tuples;
+
+ StreamFactory factory = new StreamFactory()
+ .withCollectionZkHost("collection1", zkServer.getZkAddress())
+ .withFunctionName("search", CloudSolrStream.class)
+ .withFunctionName("sort", SortStream.class);
+
+ // Basic test
+ stream = factory.constructStream("sort(search(collection1, q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc\"), by=\"a_i asc\")");
+ tuples = getTuples(stream);
+ assert(tuples.size() == 6);
+ assertOrder(tuples, 0,1,5,2,3,4);
+
+ // Basic test desc
+ stream = factory.constructStream("sort(search(collection1, q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc\"), by=\"a_i desc\")");
+ tuples = getTuples(stream);
+ assert(tuples.size() == 6);
+ assertOrder(tuples, 4,3,2,1,5,0);
+
+ // Basic w/multi comp
+ stream = factory.constructStream("sort(search(collection1, q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc\"), by=\"a_i asc, a_f desc\")");
+ tuples = getTuples(stream);
+ assert(tuples.size() == 6);
+ assertOrder(tuples, 0,5,1,2,3,4);
+
+ del("*:*");
+ commit();
+ }
+
private void testNulls() throws Exception {