You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by dp...@apache.org on 2015/11/08 04:05:14 UTC

svn commit: r1713204 - in /lucene/dev/trunk/solr: ./ solrj/src/java/org/apache/solr/client/solrj/io/stream/ solrj/src/test/org/apache/solr/client/solrj/io/stream/

Author: dpgove
Date: Sun Nov  8 03:05:13 2015
New Revision: 1713204

URL: http://svn.apache.org/viewvc?rev=1713204&view=rev
Log:
SOLR-8198: Change ReducerStream to use StreamEqualitor instead of StreamComparator


Modified:
    lucene/dev/trunk/solr/CHANGES.txt
    lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ReducerStream.java
    lucene/dev/trunk/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java
    lucene/dev/trunk/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionToExpessionTest.java
    lucene/dev/trunk/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamingTest.java

Modified: lucene/dev/trunk/solr/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/CHANGES.txt?rev=1713204&r1=1713203&r2=1713204&view=diff
==============================================================================
--- lucene/dev/trunk/solr/CHANGES.txt (original)
+++ lucene/dev/trunk/solr/CHANGES.txt Sun Nov  8 03:05:13 2015
@@ -85,6 +85,8 @@ New Features
 
 * SOLR-7938: MergeStream now supports merging more than 2 streams together (Dennis Gove)
 
+* SOLR-8198: Change ReducerStream to use StreamEqualitor instead of StreamComparator (Dennis Gove)
+
 Optimizations
 ----------------------
 * SOLR-7876: Speed up queries and operations that use many terms when timeAllowed has not been

Modified: lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ReducerStream.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ReducerStream.java?rev=1713204&r1=1713203&r2=1713204&view=diff
==============================================================================
--- lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ReducerStream.java (original)
+++ lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ReducerStream.java Sun Nov  8 03:05:13 2015
@@ -26,7 +26,11 @@ import java.util.Map;
 
 import org.apache.solr.client.solrj.io.Tuple;
 import org.apache.solr.client.solrj.io.comp.FieldComparator;
+import org.apache.solr.client.solrj.io.comp.MultipleFieldComparator;
 import org.apache.solr.client.solrj.io.comp.StreamComparator;
+import org.apache.solr.client.solrj.io.eq.FieldEqualitor;
+import org.apache.solr.client.solrj.io.eq.MultipleFieldEqualitor;
+import org.apache.solr.client.solrj.io.eq.StreamEqualitor;
 import org.apache.solr.client.solrj.io.stream.expr.Expressible;
 import org.apache.solr.client.solrj.io.stream.expr.StreamExpression;
 import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionNamedParameter;
@@ -53,14 +57,33 @@ public class ReducerStream extends Tuple
   private static final long serialVersionUID = 1;
 
   private PushBackStream stream;
-  private StreamComparator comp;
+  private StreamEqualitor eq;
 
   private transient Tuple currentGroupHead;
+  
+  public ReducerStream(TupleStream stream,StreamEqualitor eq) throws IOException {
+    init(stream,eq);
+  }
 
   public ReducerStream(TupleStream stream,StreamComparator comp) throws IOException {
-    init(stream,comp);
+    init(stream, convertToEqualitor(comp));
   }
   
+  private StreamEqualitor convertToEqualitor(StreamComparator comp){
+    if(comp instanceof MultipleFieldComparator){
+      MultipleFieldComparator mComp = (MultipleFieldComparator)comp;
+      StreamEqualitor[] eqs = new StreamEqualitor[mComp.getComps().length];
+      for(int idx = 0; idx < mComp.getComps().length; ++idx){
+        eqs[idx] = convertToEqualitor(mComp.getComps()[idx]);
+      }
+      return new MultipleFieldEqualitor(eqs);
+    }
+    else{
+      FieldComparator fComp = (FieldComparator)comp;
+      return new FieldEqualitor(fComp.getFieldName());
+    }
+  }
+
   public ReducerStream(StreamExpression expression, StreamFactory factory) throws IOException{
     // grab all parameters out
     List<StreamExpression> streamExpressions = factory.getExpressionOperandsRepresentingTypes(expression, Expressible.class, TupleStream.class);
@@ -81,15 +104,15 @@ public class ReducerStream extends Tuple
     // Reducing is always done over equality, so always use an EqualTo comparator
     
     init(factory.constructStream(streamExpressions.get(0)),
-         factory.constructComparator(((StreamExpressionValue)byExpression.getParameter()).getValue(), FieldComparator.class)
+         factory.constructEqualitor(((StreamExpressionValue)byExpression.getParameter()).getValue(), FieldEqualitor.class)
         );
   }
   
-  private void init(TupleStream stream, StreamComparator comp) throws IOException{
+  private void init(TupleStream stream, StreamEqualitor eq) throws IOException{
     this.stream = new PushBackStream(stream);
-    this.comp = comp;
+    this.eq = eq;
     
-    if(!comp.isDerivedFrom(stream.getStreamSort())){
+    if(!eq.isDerivedFrom(stream.getStreamSort())){
       throw new IOException("Invalid ReducerStream - substream comparator (sort) must be a superset of this stream's comparator.");
     }
   }
@@ -103,8 +126,8 @@ public class ReducerStream extends Tuple
     expression.addParameter(stream.toExpression(factory));
     
     // over
-    if(comp instanceof Expressible){
-      expression.addParameter(new StreamExpressionNamedParameter("by",((Expressible)comp).toExpression(factory)));
+    if(eq instanceof Expressible){
+      expression.addParameter(new StreamExpressionNamedParameter("by",((Expressible)eq).toExpression(factory)));
     }
     else{
       throw new IOException("This ReducerStream contains a non-expressible comparator - it cannot be converted to an expression");
@@ -155,7 +178,7 @@ public class ReducerStream extends Tuple
         currentGroupHead = t;
         maps.add(t.getMap());
       } else {
-        if(comp.compare(currentGroupHead, t) == 0) {
+        if(eq.test(currentGroupHead, t)) {
           maps.add(t.getMap());
         } else {
           Tuple groupHead = currentGroupHead.clone();
@@ -170,7 +193,7 @@ public class ReducerStream extends Tuple
   
   /** Return the stream sort - ie, the order in which records are returned */
   public StreamComparator getStreamSort(){
-    return comp;
+    return stream.getStreamSort();
   }
 
   public int getCost() {

Modified: lucene/dev/trunk/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java?rev=1713204&r1=1713203&r2=1713204&view=diff
==============================================================================
--- lucene/dev/trunk/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java (original)
+++ lucene/dev/trunk/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java Sun Nov  8 03:05:13 2015
@@ -444,7 +444,7 @@ public class StreamExpressionTest extend
     // basic
     expression = StreamExpressionParser.parse("group("
                                               + "search(collection1, q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_s asc, a_f asc\"),"
-                                              + "by=\"a_s asc\")");
+                                              + "by=\"a_s\")");
     stream = new ReducerStream(expression, factory);
     tuples = getTuples(stream);
 
@@ -466,7 +466,7 @@ public class StreamExpressionTest extend
     // basic w/spaces
     expression = StreamExpressionParser.parse("group("
                                               + "search(collection1, q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_s asc, a_f       asc\"),"
-                                              + "by=\"a_s asc\")");
+                                              + "by=\"a_s\")");
     stream = new ReducerStream(expression, factory);
     tuples = getTuples(stream);
 
@@ -672,7 +672,7 @@ public class StreamExpressionTest extend
         .withFunctionName("group", ReducerStream.class)
         .withFunctionName("parallel", ParallelStream.class);
 
-    ParallelStream pstream = (ParallelStream)streamFactory.constructStream("parallel(collection1, group(search(collection1, q=\"*:*\", fl=\"id,a_s,a_i,a_f\", sort=\"a_s asc,a_f asc\", partitionKeys=\"a_s\"), by=\"a_s asc\"), workers=\"2\", zkHost=\""+zkHost+"\", sort=\"a_s asc\")");
+    ParallelStream pstream = (ParallelStream)streamFactory.constructStream("parallel(collection1, group(search(collection1, q=\"*:*\", fl=\"id,a_s,a_i,a_f\", sort=\"a_s asc,a_f asc\", partitionKeys=\"a_s\"), by=\"a_s\"), workers=\"2\", zkHost=\""+zkHost+"\", sort=\"a_s asc\")");
 
     List<Tuple> tuples = getTuples(pstream);
 
@@ -693,7 +693,7 @@ public class StreamExpressionTest extend
 
     //Test Descending with Ascending subsort
 
-    pstream = (ParallelStream)streamFactory.constructStream("parallel(collection1, group(search(collection1, q=\"*:*\", fl=\"id,a_s,a_i,a_f\", sort=\"a_s desc,a_f asc\", partitionKeys=\"a_s\"), by=\"a_s desc\"), workers=\"2\", zkHost=\""+zkHost+"\", sort=\"a_s desc\")");
+    pstream = (ParallelStream)streamFactory.constructStream("parallel(collection1, group(search(collection1, q=\"*:*\", fl=\"id,a_s,a_i,a_f\", sort=\"a_s desc,a_f asc\", partitionKeys=\"a_s\"), by=\"a_s\"), workers=\"2\", zkHost=\""+zkHost+"\", sort=\"a_s desc\")");
 
     tuples = getTuples(pstream);
 

Modified: lucene/dev/trunk/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionToExpessionTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionToExpessionTest.java?rev=1713204&r1=1713203&r2=1713204&view=diff
==============================================================================
--- lucene/dev/trunk/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionToExpessionTest.java (original)
+++ lucene/dev/trunk/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionToExpessionTest.java Sun Nov  8 03:05:13 2015
@@ -133,10 +133,10 @@ public class StreamExpressionToExpession
     // Basic test
     stream = new ReducerStream(StreamExpressionParser.parse("group("
                                                   + "search(collection1, q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_s desc, a_f asc\"),"
-                                                  + "by=\"a_s desc\")"), factory);
+                                                  + "by=\"a_s\")"), factory);
     expressionString = stream.toExpression(factory).toString();
     assertTrue(expressionString.contains("group(search(collection1"));
-    assertTrue(expressionString.contains("by=\"a_s desc\""));
+    assertTrue(expressionString.contains("by=a_s"));
   }
   
   @Test

Modified: lucene/dev/trunk/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamingTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamingTest.java?rev=1713204&r1=1713203&r2=1713204&view=diff
==============================================================================
--- lucene/dev/trunk/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamingTest.java (original)
+++ lucene/dev/trunk/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamingTest.java Sun Nov  8 03:05:13 2015
@@ -355,7 +355,7 @@ public class StreamingTest extends Abstr
     //Test with spaces in the parameter lists.
     Map paramsA = mapParams("q","*:*","fl","id,a_s, a_i,  a_f","sort", "a_s asc  ,  a_f   asc");
     CloudSolrStream stream = new CloudSolrStream(zkHost, "collection1", paramsA);
-    ReducerStream rstream = new ReducerStream(stream, new FieldComparator("a_s",ComparatorOrder.ASCENDING));
+    ReducerStream rstream = new ReducerStream(stream, new FieldEqualitor("a_s"));
 
     List<Tuple> tuples = getTuples(rstream);
 
@@ -374,6 +374,27 @@ public class StreamingTest extends Abstr
     List<Map> maps2 = t2.getMaps();
     assertMaps(maps2, 4, 6);
 
+    //Test with spaces in the parameter lists using a comparator
+    paramsA = mapParams("q","*:*","fl","id,a_s, a_i,  a_f","sort", "a_s asc  ,  a_f   asc");
+    stream = new CloudSolrStream(zkHost, "collection1", paramsA);
+    rstream = new ReducerStream(stream, new FieldComparator("a_s", ComparatorOrder.ASCENDING));
+
+    tuples = getTuples(rstream);
+
+    assert(tuples.size() == 3);
+    assertOrder(tuples, 0,3,4);
+
+    t0 = tuples.get(0);
+    maps0 = t0.getMaps();
+    assertMaps(maps0, 0, 2, 1, 9);
+
+    t1 = tuples.get(1);
+    maps1 = t1.getMaps();
+    assertMaps(maps1, 3, 5, 7, 8);
+
+    t2 = tuples.get(2);
+    maps2 = t2.getMaps();
+    assertMaps(maps2, 4, 6);
 
 
     del("*:*");
@@ -402,7 +423,7 @@ public class StreamingTest extends Abstr
     //Test with spaces in the parameter lists.
     Map paramsA = mapParams("q", "blah", "fl", "id,a_s, a_i,  a_f", "sort", "a_s asc  ,  a_f   asc");
     CloudSolrStream stream = new CloudSolrStream(zkHost, "collection1", paramsA);
-    ReducerStream rstream = new ReducerStream(stream, new FieldComparator("a_s", ComparatorOrder.ASCENDING));
+    ReducerStream rstream = new ReducerStream(stream, new FieldEqualitor("a_s"));
 
     List<Tuple> tuples = getTuples(rstream);
 
@@ -433,7 +454,7 @@ public class StreamingTest extends Abstr
 
     Map paramsA = mapParams("q","*:*","fl","id,a_s,a_i,a_f","sort", "a_s asc,a_f asc", "partitionKeys", "a_s");
     CloudSolrStream stream = new CloudSolrStream(zkHost, "collection1", paramsA);
-    ReducerStream rstream = new ReducerStream(stream, new FieldComparator("a_s",ComparatorOrder.ASCENDING));
+    ReducerStream rstream = new ReducerStream(stream, new FieldEqualitor("a_s"));
     ParallelStream pstream = new ParallelStream(zkHost, "collection1", rstream, 2, new FieldComparator("a_s",ComparatorOrder.ASCENDING));
 
     List<Tuple> tuples = getTuples(pstream);
@@ -457,7 +478,7 @@ public class StreamingTest extends Abstr
 
     paramsA = mapParams("q","*:*","fl","id,a_s,a_i,a_f","sort", "a_s desc,a_f asc", "partitionKeys", "a_s");
     stream = new CloudSolrStream(zkHost, "collection1", paramsA);
-    rstream = new ReducerStream(stream, new FieldComparator("a_s",ComparatorOrder.DESCENDING));
+    rstream = new ReducerStream(stream, new FieldEqualitor("a_s"));
     pstream = new ParallelStream(zkHost, "collection1", rstream, 2, new FieldComparator("a_s",ComparatorOrder.DESCENDING));
 
     tuples = getTuples(pstream);
@@ -1474,7 +1495,7 @@ public class StreamingTest extends Abstr
 
     Map paramsA = mapParams("q","blah","fl","id,a_s,a_i,a_f","sort", "a_s asc,a_f asc", "partitionKeys", "a_s");
     CloudSolrStream stream = new CloudSolrStream(zkHost, "collection1", paramsA);
-    ReducerStream rstream = new ReducerStream(stream, new FieldComparator("a_s", ComparatorOrder.ASCENDING));
+    ReducerStream rstream = new ReducerStream(stream, new FieldEqualitor("a_s"));
     ParallelStream pstream = new ParallelStream(zkHost, "collection1", rstream, 2, new FieldComparator("a_s", ComparatorOrder.ASCENDING));
 
     List<Tuple> tuples = getTuples(pstream);