You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by dp...@apache.org on 2015/11/08 04:05:14 UTC
svn commit: r1713204 - in /lucene/dev/trunk/solr: ./
solrj/src/java/org/apache/solr/client/solrj/io/stream/
solrj/src/test/org/apache/solr/client/solrj/io/stream/
Author: dpgove
Date: Sun Nov 8 03:05:13 2015
New Revision: 1713204
URL: http://svn.apache.org/viewvc?rev=1713204&view=rev
Log:
SOLR-8198: Change ReducerStream to use StreamEqualitor instead of StreamComparator
Modified:
lucene/dev/trunk/solr/CHANGES.txt
lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ReducerStream.java
lucene/dev/trunk/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java
lucene/dev/trunk/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionToExpessionTest.java
lucene/dev/trunk/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamingTest.java
Modified: lucene/dev/trunk/solr/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/CHANGES.txt?rev=1713204&r1=1713203&r2=1713204&view=diff
==============================================================================
--- lucene/dev/trunk/solr/CHANGES.txt (original)
+++ lucene/dev/trunk/solr/CHANGES.txt Sun Nov 8 03:05:13 2015
@@ -85,6 +85,8 @@ New Features
* SOLR-7938: MergeStream now supports merging more than 2 streams together (Dennis Gove)
+* SOLR-8198: Change ReducerStream to use StreamEqualitor instead of StreamComparator (Dennis Gove)
+
Optimizations
----------------------
* SOLR-7876: Speed up queries and operations that use many terms when timeAllowed has not been
Modified: lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ReducerStream.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ReducerStream.java?rev=1713204&r1=1713203&r2=1713204&view=diff
==============================================================================
--- lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ReducerStream.java (original)
+++ lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ReducerStream.java Sun Nov 8 03:05:13 2015
@@ -26,7 +26,11 @@ import java.util.Map;
import org.apache.solr.client.solrj.io.Tuple;
import org.apache.solr.client.solrj.io.comp.FieldComparator;
+import org.apache.solr.client.solrj.io.comp.MultipleFieldComparator;
import org.apache.solr.client.solrj.io.comp.StreamComparator;
+import org.apache.solr.client.solrj.io.eq.FieldEqualitor;
+import org.apache.solr.client.solrj.io.eq.MultipleFieldEqualitor;
+import org.apache.solr.client.solrj.io.eq.StreamEqualitor;
import org.apache.solr.client.solrj.io.stream.expr.Expressible;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpression;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionNamedParameter;
@@ -53,14 +57,33 @@ public class ReducerStream extends Tuple
private static final long serialVersionUID = 1;
private PushBackStream stream;
- private StreamComparator comp;
+ private StreamEqualitor eq;
private transient Tuple currentGroupHead;
+
+ public ReducerStream(TupleStream stream,StreamEqualitor eq) throws IOException {
+ init(stream,eq);
+ }
public ReducerStream(TupleStream stream,StreamComparator comp) throws IOException {
- init(stream,comp);
+ init(stream, convertToEqualitor(comp));
}
+ private StreamEqualitor convertToEqualitor(StreamComparator comp){
+ if(comp instanceof MultipleFieldComparator){
+ MultipleFieldComparator mComp = (MultipleFieldComparator)comp;
+ StreamEqualitor[] eqs = new StreamEqualitor[mComp.getComps().length];
+ for(int idx = 0; idx < mComp.getComps().length; ++idx){
+ eqs[idx] = convertToEqualitor(mComp.getComps()[idx]);
+ }
+ return new MultipleFieldEqualitor(eqs);
+ }
+ else{
+ FieldComparator fComp = (FieldComparator)comp;
+ return new FieldEqualitor(fComp.getFieldName());
+ }
+ }
+
public ReducerStream(StreamExpression expression, StreamFactory factory) throws IOException{
// grab all parameters out
List<StreamExpression> streamExpressions = factory.getExpressionOperandsRepresentingTypes(expression, Expressible.class, TupleStream.class);
@@ -81,15 +104,15 @@ public class ReducerStream extends Tuple
// Reducing is always done over equality, so always use an EqualTo comparator
init(factory.constructStream(streamExpressions.get(0)),
- factory.constructComparator(((StreamExpressionValue)byExpression.getParameter()).getValue(), FieldComparator.class)
+ factory.constructEqualitor(((StreamExpressionValue)byExpression.getParameter()).getValue(), FieldEqualitor.class)
);
}
- private void init(TupleStream stream, StreamComparator comp) throws IOException{
+ private void init(TupleStream stream, StreamEqualitor eq) throws IOException{
this.stream = new PushBackStream(stream);
- this.comp = comp;
+ this.eq = eq;
- if(!comp.isDerivedFrom(stream.getStreamSort())){
+ if(!eq.isDerivedFrom(stream.getStreamSort())){
throw new IOException("Invalid ReducerStream - substream comparator (sort) must be a superset of this stream's comparator.");
}
}
@@ -103,8 +126,8 @@ public class ReducerStream extends Tuple
expression.addParameter(stream.toExpression(factory));
// over
- if(comp instanceof Expressible){
- expression.addParameter(new StreamExpressionNamedParameter("by",((Expressible)comp).toExpression(factory)));
+ if(eq instanceof Expressible){
+ expression.addParameter(new StreamExpressionNamedParameter("by",((Expressible)eq).toExpression(factory)));
}
else{
throw new IOException("This ReducerStream contains a non-expressible comparator - it cannot be converted to an expression");
@@ -155,7 +178,7 @@ public class ReducerStream extends Tuple
currentGroupHead = t;
maps.add(t.getMap());
} else {
- if(comp.compare(currentGroupHead, t) == 0) {
+ if(eq.test(currentGroupHead, t)) {
maps.add(t.getMap());
} else {
Tuple groupHead = currentGroupHead.clone();
@@ -170,7 +193,7 @@ public class ReducerStream extends Tuple
/** Return the stream sort - ie, the order in which records are returned */
public StreamComparator getStreamSort(){
- return comp;
+ return stream.getStreamSort();
}
public int getCost() {
Modified: lucene/dev/trunk/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java?rev=1713204&r1=1713203&r2=1713204&view=diff
==============================================================================
--- lucene/dev/trunk/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java (original)
+++ lucene/dev/trunk/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java Sun Nov 8 03:05:13 2015
@@ -444,7 +444,7 @@ public class StreamExpressionTest extend
// basic
expression = StreamExpressionParser.parse("group("
+ "search(collection1, q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_s asc, a_f asc\"),"
- + "by=\"a_s asc\")");
+ + "by=\"a_s\")");
stream = new ReducerStream(expression, factory);
tuples = getTuples(stream);
@@ -466,7 +466,7 @@ public class StreamExpressionTest extend
// basic w/spaces
expression = StreamExpressionParser.parse("group("
+ "search(collection1, q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_s asc, a_f asc\"),"
- + "by=\"a_s asc\")");
+ + "by=\"a_s\")");
stream = new ReducerStream(expression, factory);
tuples = getTuples(stream);
@@ -672,7 +672,7 @@ public class StreamExpressionTest extend
.withFunctionName("group", ReducerStream.class)
.withFunctionName("parallel", ParallelStream.class);
- ParallelStream pstream = (ParallelStream)streamFactory.constructStream("parallel(collection1, group(search(collection1, q=\"*:*\", fl=\"id,a_s,a_i,a_f\", sort=\"a_s asc,a_f asc\", partitionKeys=\"a_s\"), by=\"a_s asc\"), workers=\"2\", zkHost=\""+zkHost+"\", sort=\"a_s asc\")");
+ ParallelStream pstream = (ParallelStream)streamFactory.constructStream("parallel(collection1, group(search(collection1, q=\"*:*\", fl=\"id,a_s,a_i,a_f\", sort=\"a_s asc,a_f asc\", partitionKeys=\"a_s\"), by=\"a_s\"), workers=\"2\", zkHost=\""+zkHost+"\", sort=\"a_s asc\")");
List<Tuple> tuples = getTuples(pstream);
@@ -693,7 +693,7 @@ public class StreamExpressionTest extend
//Test Descending with Ascending subsort
- pstream = (ParallelStream)streamFactory.constructStream("parallel(collection1, group(search(collection1, q=\"*:*\", fl=\"id,a_s,a_i,a_f\", sort=\"a_s desc,a_f asc\", partitionKeys=\"a_s\"), by=\"a_s desc\"), workers=\"2\", zkHost=\""+zkHost+"\", sort=\"a_s desc\")");
+ pstream = (ParallelStream)streamFactory.constructStream("parallel(collection1, group(search(collection1, q=\"*:*\", fl=\"id,a_s,a_i,a_f\", sort=\"a_s desc,a_f asc\", partitionKeys=\"a_s\"), by=\"a_s\"), workers=\"2\", zkHost=\""+zkHost+"\", sort=\"a_s desc\")");
tuples = getTuples(pstream);
Modified: lucene/dev/trunk/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionToExpessionTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionToExpessionTest.java?rev=1713204&r1=1713203&r2=1713204&view=diff
==============================================================================
--- lucene/dev/trunk/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionToExpessionTest.java (original)
+++ lucene/dev/trunk/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionToExpessionTest.java Sun Nov 8 03:05:13 2015
@@ -133,10 +133,10 @@ public class StreamExpressionToExpession
// Basic test
stream = new ReducerStream(StreamExpressionParser.parse("group("
+ "search(collection1, q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_s desc, a_f asc\"),"
- + "by=\"a_s desc\")"), factory);
+ + "by=\"a_s\")"), factory);
expressionString = stream.toExpression(factory).toString();
assertTrue(expressionString.contains("group(search(collection1"));
- assertTrue(expressionString.contains("by=\"a_s desc\""));
+ assertTrue(expressionString.contains("by=a_s"));
}
@Test
Modified: lucene/dev/trunk/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamingTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamingTest.java?rev=1713204&r1=1713203&r2=1713204&view=diff
==============================================================================
--- lucene/dev/trunk/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamingTest.java (original)
+++ lucene/dev/trunk/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamingTest.java Sun Nov 8 03:05:13 2015
@@ -355,7 +355,7 @@ public class StreamingTest extends Abstr
//Test with spaces in the parameter lists.
Map paramsA = mapParams("q","*:*","fl","id,a_s, a_i, a_f","sort", "a_s asc , a_f asc");
CloudSolrStream stream = new CloudSolrStream(zkHost, "collection1", paramsA);
- ReducerStream rstream = new ReducerStream(stream, new FieldComparator("a_s",ComparatorOrder.ASCENDING));
+ ReducerStream rstream = new ReducerStream(stream, new FieldEqualitor("a_s"));
List<Tuple> tuples = getTuples(rstream);
@@ -374,6 +374,27 @@ public class StreamingTest extends Abstr
List<Map> maps2 = t2.getMaps();
assertMaps(maps2, 4, 6);
+ //Test with spaces in the parameter lists using a comparator
+ paramsA = mapParams("q","*:*","fl","id,a_s, a_i, a_f","sort", "a_s asc , a_f asc");
+ stream = new CloudSolrStream(zkHost, "collection1", paramsA);
+ rstream = new ReducerStream(stream, new FieldComparator("a_s", ComparatorOrder.ASCENDING));
+
+ tuples = getTuples(rstream);
+
+ assert(tuples.size() == 3);
+ assertOrder(tuples, 0,3,4);
+
+ t0 = tuples.get(0);
+ maps0 = t0.getMaps();
+ assertMaps(maps0, 0, 2, 1, 9);
+
+ t1 = tuples.get(1);
+ maps1 = t1.getMaps();
+ assertMaps(maps1, 3, 5, 7, 8);
+
+ t2 = tuples.get(2);
+ maps2 = t2.getMaps();
+ assertMaps(maps2, 4, 6);
del("*:*");
@@ -402,7 +423,7 @@ public class StreamingTest extends Abstr
//Test with spaces in the parameter lists.
Map paramsA = mapParams("q", "blah", "fl", "id,a_s, a_i, a_f", "sort", "a_s asc , a_f asc");
CloudSolrStream stream = new CloudSolrStream(zkHost, "collection1", paramsA);
- ReducerStream rstream = new ReducerStream(stream, new FieldComparator("a_s", ComparatorOrder.ASCENDING));
+ ReducerStream rstream = new ReducerStream(stream, new FieldEqualitor("a_s"));
List<Tuple> tuples = getTuples(rstream);
@@ -433,7 +454,7 @@ public class StreamingTest extends Abstr
Map paramsA = mapParams("q","*:*","fl","id,a_s,a_i,a_f","sort", "a_s asc,a_f asc", "partitionKeys", "a_s");
CloudSolrStream stream = new CloudSolrStream(zkHost, "collection1", paramsA);
- ReducerStream rstream = new ReducerStream(stream, new FieldComparator("a_s",ComparatorOrder.ASCENDING));
+ ReducerStream rstream = new ReducerStream(stream, new FieldEqualitor("a_s"));
ParallelStream pstream = new ParallelStream(zkHost, "collection1", rstream, 2, new FieldComparator("a_s",ComparatorOrder.ASCENDING));
List<Tuple> tuples = getTuples(pstream);
@@ -457,7 +478,7 @@ public class StreamingTest extends Abstr
paramsA = mapParams("q","*:*","fl","id,a_s,a_i,a_f","sort", "a_s desc,a_f asc", "partitionKeys", "a_s");
stream = new CloudSolrStream(zkHost, "collection1", paramsA);
- rstream = new ReducerStream(stream, new FieldComparator("a_s",ComparatorOrder.DESCENDING));
+ rstream = new ReducerStream(stream, new FieldEqualitor("a_s"));
pstream = new ParallelStream(zkHost, "collection1", rstream, 2, new FieldComparator("a_s",ComparatorOrder.DESCENDING));
tuples = getTuples(pstream);
@@ -1474,7 +1495,7 @@ public class StreamingTest extends Abstr
Map paramsA = mapParams("q","blah","fl","id,a_s,a_i,a_f","sort", "a_s asc,a_f asc", "partitionKeys", "a_s");
CloudSolrStream stream = new CloudSolrStream(zkHost, "collection1", paramsA);
- ReducerStream rstream = new ReducerStream(stream, new FieldComparator("a_s", ComparatorOrder.ASCENDING));
+ ReducerStream rstream = new ReducerStream(stream, new FieldEqualitor("a_s"));
ParallelStream pstream = new ParallelStream(zkHost, "collection1", rstream, 2, new FieldComparator("a_s", ComparatorOrder.ASCENDING));
List<Tuple> tuples = getTuples(pstream);