You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by jb...@apache.org on 2015/07/22 22:48:00 UTC

svn commit: r1692327 - in /lucene/dev/branches/branch_5x: ./ solr/ solr/core/ solr/core/src/java/org/apache/solr/handler/ solr/solrj/ solr/solrj/src/java/org/apache/solr/client/solrj/io/comp/ solr/solrj/src/java/org/apache/solr/client/solrj/io/eq/ solr...

Author: jbernste
Date: Wed Jul 22 20:47:59 2015
New Revision: 1692327

URL: http://svn.apache.org/r1692327
Log:
SOLR-7554: Add checks in Streams for incoming stream order

Added:
    lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/io/comp/FieldComparator.java
      - copied unchanged from r1687258, lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/comp/FieldComparator.java
    lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/io/comp/MultipleFieldComparator.java
      - copied unchanged from r1687258, lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/comp/MultipleFieldComparator.java
    lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/io/eq/FieldEqualitor.java
      - copied unchanged from r1687258, lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/eq/FieldEqualitor.java
    lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/io/eq/MultipleFieldEqualitor.java
      - copied unchanged from r1687258, lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/eq/MultipleFieldEqualitor.java
Removed:
    lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/io/comp/MultiComp.java
    lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/io/eq/MultiEqualitor.java
Modified:
    lucene/dev/branches/branch_5x/   (props changed)
    lucene/dev/branches/branch_5x/solr/   (props changed)
    lucene/dev/branches/branch_5x/solr/core/   (props changed)
    lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/handler/SQLHandler.java
    lucene/dev/branches/branch_5x/solr/solrj/   (props changed)
    lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/io/comp/StreamComparator.java
    lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/io/eq/StreamEqualitor.java
    lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/CloudSolrStream.java
    lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/MergeStream.java
    lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ParallelStream.java
    lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/PushBackStream.java
    lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/RankStream.java
    lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ReducerStream.java
    lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/RollupStream.java
    lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/SolrStream.java
    lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/TupleStream.java
    lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/UniqueStream.java
    lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/expr/StreamFactory.java
    lucene/dev/branches/branch_5x/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/CountStream.java
    lucene/dev/branches/branch_5x/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java
    lucene/dev/branches/branch_5x/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionToExpessionTest.java
    lucene/dev/branches/branch_5x/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamingTest.java

Modified: lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/handler/SQLHandler.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/handler/SQLHandler.java?rev=1692327&r1=1692326&r2=1692327&view=diff
==============================================================================
--- lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/handler/SQLHandler.java (original)
+++ lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/handler/SQLHandler.java Wed Jul 22 20:47:59 2015
@@ -32,8 +32,9 @@ import com.google.common.base.Strings;
 import com.google.common.collect.Iterables;
 import org.apache.solr.client.solrj.io.Tuple;
 import org.apache.solr.client.solrj.io.comp.ComparatorOrder;
+import org.apache.solr.client.solrj.io.comp.FieldComparator;
+import org.apache.solr.client.solrj.io.comp.MultipleFieldComparator;
 import org.apache.solr.client.solrj.io.comp.StreamComparator;
-import org.apache.solr.client.solrj.io.comp.MultiComp;
 import org.apache.solr.client.solrj.io.stream.CloudSolrStream;
 import org.apache.solr.client.solrj.io.stream.ParallelStream;
 import org.apache.solr.client.solrj.io.stream.RankStream;
@@ -171,7 +172,7 @@ public class SQLHandler extends RequestH
     if(numWorkers > 1) {
       // Do the rollups in parallel
       // Maintain the sort of the Tuples coming from the workers.
-      Comparator<Tuple> comp = bucketSortComp(buckets, sortDirection);
+      StreamComparator comp = bucketSortComp(buckets, sortDirection);
       tupleStream = new ParallelStream(workerZkHost, workerCollection, tupleStream, numWorkers, comp);
     }
 
@@ -185,7 +186,7 @@ public class SQLHandler extends RequestH
     if(sqlVisitor.sorts != null && sqlVisitor.sorts.size() > 0) {
       if(!sortsEqual(buckets, sortDirection, sqlVisitor.sorts)) {
         int limit = sqlVisitor.limit == -1 ? 100 : sqlVisitor.limit;
-        Comparator<Tuple> comp = getComp(sqlVisitor.sorts);
+        StreamComparator comp = getComp(sqlVisitor.sorts);
         //Rank the Tuples
         //If parallel stream is used ALL the Rolled up tuples from the workers will be ranked
         //Providing a true Top or Bottom.
@@ -311,35 +312,35 @@ public class SQLHandler extends RequestH
     return "asc";
   }
 
-  private static Comparator<Tuple> bucketSortComp(Bucket[] buckets, String dir) {
-    Comparator<Tuple>[] comps = new Comparator[buckets.length];
+  private static StreamComparator bucketSortComp(Bucket[] buckets, String dir) {
+    FieldComparator[] comps = new FieldComparator[buckets.length];
     for(int i=0; i<buckets.length; i++) {
       ComparatorOrder comparatorOrder = ascDescComp(dir);
       String sortKey = buckets[i].toString();
-      comps[i] = new StreamComparator(stripQuotes(sortKey), comparatorOrder);
+      comps[i] = new FieldComparator(stripQuotes(sortKey), comparatorOrder);
     }
 
     if(comps.length == 1) {
       return comps[0];
     } else {
-      return new MultiComp(comps);
+      return new MultipleFieldComparator(comps);
     }
   }
 
-  private static Comparator<Tuple> getComp(List<SortItem> sortItems) {
-    Comparator<Tuple>[] comps = new Comparator[sortItems.size()];
+  private static StreamComparator getComp(List<SortItem> sortItems) {
+    FieldComparator[] comps = new FieldComparator[sortItems.size()];
     for(int i=0; i<sortItems.size(); i++) {
       SortItem sortItem = sortItems.get(i);
       String ordering = sortItem.getOrdering().toString();
       ComparatorOrder comparatorOrder = ascDescComp(ordering);
       String sortKey = sortItem.getSortKey().toString();
-      comps[i] = new StreamComparator(stripQuotes(sortKey), comparatorOrder);
+      comps[i] = new FieldComparator(stripQuotes(sortKey), comparatorOrder);
     }
 
     if(comps.length == 1) {
       return comps[0];
     } else {
-      return new MultiComp(comps);
+      return new MultipleFieldComparator(comps);
     }
   }
 
@@ -671,6 +672,10 @@ public class SQLHandler extends RequestH
       return children;
     }
 
+    public StreamComparator getStreamSort(){
+      return stream.getStreamSort();
+    }
+
     public void setStreamContext(StreamContext context) {
       stream.setStreamContext(context);
     }
@@ -708,6 +713,10 @@ public class SQLHandler extends RequestH
       this.stream.close();
     }
 
+    public StreamComparator getStreamSort(){
+      return stream.getStreamSort();
+    }
+
     public List<TupleStream> children() {
       List<TupleStream> children = new ArrayList();
       children.add(stream);

Modified: lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/io/comp/StreamComparator.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/io/comp/StreamComparator.java?rev=1692327&r1=1692326&r2=1692327&view=diff
==============================================================================
--- lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/io/comp/StreamComparator.java (original)
+++ lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/io/comp/StreamComparator.java Wed Jul 22 20:47:59 2015
@@ -26,92 +26,7 @@ import org.apache.solr.client.solrj.io.s
 import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionValue;
 import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
 
-/**
- *  An equality field Comparator which compares a field of two Tuples and determines sort order.
- **/
-public class StreamComparator implements Comparator<Tuple>, Expressible, Serializable {
-
-  private static final long serialVersionUID = 1;
-  
-  private String leftField;
-  private String rightField;
-  private final ComparatorOrder order;
-  private ComparatorLambda comparator;
-  
-  public StreamComparator(String field, ComparatorOrder order) {
-    this.leftField = field;
-    this.rightField = field;
-    this.order = order;
-    assignComparator();
-  }
-  public StreamComparator(String leftField, String rightField, ComparatorOrder order){
-    this.leftField = leftField;
-    this.rightField = rightField;
-    this.order = order;
-    assignComparator();
-  }
-  
-  public StreamExpressionParameter toExpression(StreamFactory factory){
-    StringBuilder sb = new StringBuilder();
-    
-    sb.append(leftField);
-    
-    if(!leftField.equals(rightField)){
-      sb.append("=");
-      sb.append(rightField); 
-    }
-    
-    sb.append(" ");
-    sb.append(order);
-    
-    return new StreamExpressionValue(sb.toString());
-  }
-  
-  /*
-   * What're we doing here messing around with lambdas for the comparator logic?
-   * We want the compare(...) function to run as fast as possible because it will be called many many
-   * times over the lifetime of this object. For that reason we want to limit the number of comparisons
-   * taking place in the compare(...) function. Because this class supports both ascending and
-   * descending comparisons and the logic for each is slightly different, we want to do the 
-   *   if(ascending){ compare like this } else { compare like this }
-   * check only once - we can do that in the constructor of this class, create a lambda, and then execute 
-   * that lambda in the compare function. A little bit of branch prediction savings right here.
-   */
-  private void assignComparator(){
-    if(ComparatorOrder.DESCENDING == order){
-      comparator = new ComparatorLambda() {
-        @Override
-        public int compare(Tuple leftTuple, Tuple rightTuple) {
-          Comparable leftComp = (Comparable)leftTuple.get(leftField);
-          Comparable rightComp = (Comparable)rightTuple.get(rightField);
-          
-          if(leftComp == rightComp){ return 0; } // if both null then they are equal. if both are same ref then are equal
-          if(null == leftComp){ return 1; }
-          if(null == rightComp){ return -1; }
-          
-          return rightComp.compareTo(leftComp);
-        }
-      };
-    }
-    else{
-      // See above for black magic reasoning.
-      comparator = new ComparatorLambda() {
-        @Override
-        public int compare(Tuple leftTuple, Tuple rightTuple) {
-          Comparable leftComp = (Comparable)leftTuple.get(leftField);
-          Comparable rightComp = (Comparable)rightTuple.get(rightField);
-          
-          if(leftComp == rightComp){ return 0; } // if both null then they are equal. if both are same ref then are equal
-          if(null == leftComp){ return -1; }
-          if(null == rightComp){ return 1; }
-          
-          return leftComp.compareTo(rightComp);
-        }
-      };
-    }
-  }
-
-  public int compare(Tuple leftTuple, Tuple rightTuple) {
-    return comparator.compare(leftTuple, rightTuple); 
-  }
+/** Defines a comparator we can use with TupleStreams */
+public interface StreamComparator extends Comparator<Tuple>, Expressible, Serializable {
+  public boolean isDerivedFrom(StreamComparator base);
 }
\ No newline at end of file

Modified: lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/io/eq/StreamEqualitor.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/io/eq/StreamEqualitor.java?rev=1692327&r1=1692326&r2=1692327&view=diff
==============================================================================
--- lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/io/eq/StreamEqualitor.java (original)
+++ lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/io/eq/StreamEqualitor.java Wed Jul 22 20:47:59 2015
@@ -18,54 +18,13 @@
 package org.apache.solr.client.solrj.io.eq;
 
 import java.io.Serializable;
-import java.util.Comparator;
 
 import org.apache.solr.client.solrj.io.Tuple;
-import org.apache.solr.client.solrj.io.comp.ComparatorOrder;
 import org.apache.solr.client.solrj.io.comp.StreamComparator;
 import org.apache.solr.client.solrj.io.stream.expr.Expressible;
-import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionParameter;
-import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionValue;
-import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
 
-/**
- *  An equality field Equalitor which compares a field of two Tuples and determines if they are equal.
- **/
-public class StreamEqualitor implements Equalitor<Tuple>, Expressible, Serializable {
-
-  private static final long serialVersionUID = 1;
-  
-  private String leftFieldName;
-  private String rightFieldName;
-  private StreamComparator comparator;
-  
-  public StreamEqualitor(String fieldName) {
-    init(fieldName, fieldName);
-  }
-  public StreamEqualitor(String leftFieldName, String rightFieldName){
-    init(leftFieldName, rightFieldName);
-  }
-  
-  private void init(String leftFieldName, String rightFieldName){
-    this.leftFieldName = leftFieldName;
-    this.rightFieldName = rightFieldName;
-    this.comparator = new StreamComparator(leftFieldName, rightFieldName, ComparatorOrder.ASCENDING);
-  }
-  
-  public StreamExpressionParameter toExpression(StreamFactory factory){
-    StringBuilder sb = new StringBuilder();
-    
-    sb.append(leftFieldName);
-    
-    if(!leftFieldName.equals(rightFieldName)){
-      sb.append("=");
-      sb.append(rightFieldName); 
-    }
-    
-    return new StreamExpressionValue(sb.toString());
-  }
-  
-  public boolean test(Tuple leftTuple, Tuple rightTuple) {
-    return 0 == comparator.compare(leftTuple, rightTuple); 
-  }
+/** Defines a comparator we can use with TupleStreams */
+public interface StreamEqualitor extends Equalitor<Tuple>, Expressible, Serializable {
+  public boolean isDerivedFrom(StreamEqualitor base);
+  public boolean isDerivedFrom(StreamComparator base);
 }
\ No newline at end of file

Modified: lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/CloudSolrStream.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/CloudSolrStream.java?rev=1692327&r1=1692326&r2=1692327&view=diff
==============================================================================
--- lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/CloudSolrStream.java (original)
+++ lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/CloudSolrStream.java Wed Jul 22 20:47:59 2015
@@ -38,7 +38,8 @@ import org.apache.solr.client.solrj.impl
 import org.apache.solr.client.solrj.io.SolrClientCache;
 import org.apache.solr.client.solrj.io.Tuple;
 import org.apache.solr.client.solrj.io.comp.ComparatorOrder;
-import org.apache.solr.client.solrj.io.comp.MultiComp;
+import org.apache.solr.client.solrj.io.comp.MultipleFieldComparator;
+import org.apache.solr.client.solrj.io.comp.FieldComparator;
 import org.apache.solr.client.solrj.io.comp.StreamComparator;
 import org.apache.solr.client.solrj.io.stream.expr.Expressible;
 import org.apache.solr.client.solrj.io.stream.expr.StreamExpression;
@@ -69,7 +70,7 @@ public class CloudSolrStream extends Tup
   protected String collection;
   protected Map<String,String> params;
   private Map<String, String> fieldMappings;
-  protected Comparator<Tuple> comp;
+  protected StreamComparator comp;
   private int zkConnectTimeout = 10000;
   private int zkClientTimeout = 10000;
   private int numWorkers;
@@ -242,7 +243,7 @@ public class CloudSolrStream extends Tup
     return solrStreams;
   }
 
-  private Comparator<Tuple> parseComp(String sort, String fl) throws IOException {
+  private StreamComparator parseComp(String sort, String fl) throws IOException {
 
     String[] fls = fl.split(",");
     HashSet fieldSet = new HashSet();
@@ -251,7 +252,7 @@ public class CloudSolrStream extends Tup
     }
 
     String[] sorts = sort.split(",");
-    Comparator[] comps = new Comparator[sorts.length];
+    StreamComparator[] comps = new StreamComparator[sorts.length];
     for(int i=0; i<sorts.length; i++) {
       String s = sorts[i];
 
@@ -269,11 +270,11 @@ public class CloudSolrStream extends Tup
         fieldName = fieldMappings.get(fieldName);
       }
       
-      comps[i] = new StreamComparator(fieldName, order.equalsIgnoreCase("asc") ? ComparatorOrder.ASCENDING : ComparatorOrder.DESCENDING);
+      comps[i] = new FieldComparator(fieldName, order.equalsIgnoreCase("asc") ? ComparatorOrder.ASCENDING : ComparatorOrder.DESCENDING);
     }
 
     if(comps.length > 1) {
-      return new MultiComp(comps);
+      return new MultipleFieldComparator(comps);
     } else {
       return comps[0];
     }
@@ -351,6 +352,11 @@ public class CloudSolrStream extends Tup
       cloudSolrClient.close();
     }
   }
+  
+  /** Return the stream sort - ie, the order in which records are returned */
+  public StreamComparator getStreamSort(){
+    return comp;
+  }
 
   public Tuple read() throws IOException {
     return _read();

Modified: lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/MergeStream.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/MergeStream.java?rev=1692327&r1=1692326&r2=1692327&view=diff
==============================================================================
--- lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/MergeStream.java (original)
+++ lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/MergeStream.java Wed Jul 22 20:47:59 2015
@@ -19,11 +19,11 @@ package org.apache.solr.client.solrj.io.
 
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Comparator;
 import java.util.List;
 import java.util.Locale;
 
 import org.apache.solr.client.solrj.io.Tuple;
+import org.apache.solr.client.solrj.io.comp.FieldComparator;
 import org.apache.solr.client.solrj.io.comp.StreamComparator;
 import org.apache.solr.client.solrj.io.stream.expr.Expressible;
 import org.apache.solr.client.solrj.io.stream.expr.StreamExpression;
@@ -43,12 +43,10 @@ public class MergeStream extends TupleSt
 
   private PushBackStream streamA;
   private PushBackStream streamB;
-  private Comparator<Tuple> comp;
+  private StreamComparator comp;
 
-  public MergeStream(TupleStream streamA, TupleStream streamB, Comparator<Tuple> comp) {
-    this.streamA = new PushBackStream(streamA);
-    this.streamB = new PushBackStream(streamB);
-    this.comp = comp;
+  public MergeStream(TupleStream streamA, TupleStream streamB, StreamComparator comp) throws IOException {
+    init(streamA, streamB, comp);
   }
   
   public MergeStream(StreamExpression expression,StreamFactory factory) throws IOException {
@@ -64,15 +62,26 @@ public class MergeStream extends TupleSt
     if(2 != streamExpressions.size()){
       throw new IOException(String.format(Locale.ROOT,"Invalid expression %s - expecting two streams but found %d (must be PushBackStream types)",expression, streamExpressions.size()));
     }
-    this.streamA = new PushBackStream(factory.constructStream(streamExpressions.get(0)));
-    this.streamB = new PushBackStream(factory.constructStream(streamExpressions.get(1)));
-    
+
     if(null == onExpression || !(onExpression.getParameter() instanceof StreamExpressionValue)){
       throw new IOException(String.format(Locale.ROOT,"Invalid expression %s - expecting single 'on' parameter listing fields to merge on but didn't find one",expression));
     }
     
-    // Merge is always done over equality, so always use an EqualTo comparator
-    this.comp = factory.constructComparator(((StreamExpressionValue)onExpression.getParameter()).getValue(), StreamComparator.class);
+    init( factory.constructStream(streamExpressions.get(0)),
+          factory.constructStream(streamExpressions.get(1)),
+          factory.constructComparator(((StreamExpressionValue)onExpression.getParameter()).getValue(), FieldComparator.class)
+        );
+  }
+  
+  private void init(TupleStream streamA, TupleStream streamB, StreamComparator comp) throws IOException {
+    this.streamA = new PushBackStream(streamA);
+    this.streamB = new PushBackStream(streamB);
+    this.comp = comp;
+
+    // streamA and streamB must both be sorted so that comp can be derived from
+    if(!comp.isDerivedFrom(streamA.getStreamSort()) || !comp.isDerivedFrom(streamB.getStreamSort())){
+      throw new IOException("Invalid MergeStream - both substream comparators (sort) must be a superset of this stream's comparator.");
+    }
   }
   
   @Override
@@ -85,12 +94,7 @@ public class MergeStream extends TupleSt
     expression.addParameter(streamB.toExpression(factory));
     
     // on
-    if(comp instanceof Expressible){
-      expression.addParameter(new StreamExpressionNamedParameter("on",((Expressible)comp).toExpression(factory)));
-    }
-    else{
-      throw new IOException("This MergeStream contains a non-expressible comparator - it cannot be converted to an expression");
-    }
+    expression.addParameter(new StreamExpressionNamedParameter("on",comp.toExpression(factory)));
     
     return expression;   
   }
@@ -145,6 +149,12 @@ public class MergeStream extends TupleSt
       return b;
     }
   }
+  
+  /** Return the stream sort - ie, the order in which records are returned */
+  public StreamComparator getStreamSort(){
+    return comp;
+  }
+
 
   public int getCost() {
     return 0;

Modified: lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ParallelStream.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ParallelStream.java?rev=1692327&r1=1692326&r2=1692327&view=diff
==============================================================================
--- lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ParallelStream.java (original)
+++ lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ParallelStream.java Wed Jul 22 20:47:59 2015
@@ -24,7 +24,6 @@ import java.net.URLEncoder;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.Comparator;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
@@ -34,6 +33,7 @@ import java.util.Map.Entry;
 import java.util.Random;
 
 import org.apache.solr.client.solrj.io.Tuple;
+import org.apache.solr.client.solrj.io.comp.FieldComparator;
 import org.apache.solr.client.solrj.io.comp.StreamComparator;
 import org.apache.solr.client.solrj.io.stream.expr.Expressible;
 import org.apache.solr.client.solrj.io.stream.expr.StreamExpression;
@@ -65,7 +65,7 @@ public class ParallelStream extends Clou
                         String collection,
                         TupleStream tupleStream,
                         int workers,
-                        Comparator<Tuple> comp) throws IOException {
+                        StreamComparator comp) throws IOException {
     init(zkHost,collection,tupleStream,workers,comp);
   }
 
@@ -74,7 +74,7 @@ public class ParallelStream extends Clou
                         String collection,
                         String expressionString,
                         int workers,
-                        Comparator<Tuple> comp) throws IOException {
+                        StreamComparator comp) throws IOException {
     objectSerialize = false;
     TupleStream tStream = this.streamFactory.constructStream(expressionString);
     init(zkHost,collection, tStream, workers,comp);
@@ -140,12 +140,12 @@ public class ParallelStream extends Clou
     
     // We've got all the required items    
     TupleStream stream = factory.constructStream(streamExpressions.get(0));
-    Comparator<Tuple> comp = factory.constructComparator(((StreamExpressionValue)sortExpression.getParameter()).getValue(), StreamComparator.class);
+    StreamComparator comp = factory.constructComparator(((StreamExpressionValue)sortExpression.getParameter()).getValue(), FieldComparator.class);
     streamFactory = factory;
     init(zkHost,collectionName,stream,workersInt,comp);
   }
 
-  private void init(String zkHost,String collection,TupleStream tupleStream,int workers,Comparator<Tuple> comp) throws IOException{
+  private void init(String zkHost,String collection,TupleStream tupleStream,int workers,StreamComparator comp) throws IOException{
     this.zkHost = zkHost;
     this.collection = collection;
     this.workers = workers;
@@ -179,12 +179,7 @@ public class ParallelStream extends Clou
     }
         
     // sort
-    if(comp instanceof Expressible){
-      expression.addParameter(new StreamExpressionNamedParameter("sort",((Expressible)comp).toExpression(factory)));
-    }
-    else{
-      throw new IOException("This ParallelStream contains a non-expressible comparator - it cannot be converted to an expression");
-    }
+    expression.addParameter(new StreamExpressionNamedParameter("sort",comp.toExpression(factory)));
     
     // zkHost
     expression.addParameter(new StreamExpressionNamedParameter("zkHost", zkHost));

Modified: lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/PushBackStream.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/PushBackStream.java?rev=1692327&r1=1692326&r2=1692327&view=diff
==============================================================================
--- lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/PushBackStream.java (original)
+++ lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/PushBackStream.java Wed Jul 22 20:47:59 2015
@@ -22,6 +22,7 @@ import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.solr.client.solrj.io.Tuple;
+import org.apache.solr.client.solrj.io.comp.StreamComparator;
 import org.apache.solr.client.solrj.io.stream.expr.Expressible;
 import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionParameter;
 import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
@@ -82,6 +83,13 @@ public class PushBackStream extends Tupl
       return stream.read();
     }
   }
+  
+  /** Return the stream sort - ie, the order in which records are returned
+   *  This returns the streamSort of the substream */
+  public StreamComparator getStreamSort(){
+    return stream.getStreamSort();
+  }
+
 
   public int getCost() {
     return 0;

Modified: lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/RankStream.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/RankStream.java?rev=1692327&r1=1692326&r2=1692327&view=diff
==============================================================================
--- lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/RankStream.java (original)
+++ lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/RankStream.java Wed Jul 22 20:47:59 2015
@@ -27,6 +27,7 @@ import java.util.Locale;
 import java.util.PriorityQueue;
 
 import org.apache.solr.client.solrj.io.Tuple;
+import org.apache.solr.client.solrj.io.comp.FieldComparator;
 import org.apache.solr.client.solrj.io.comp.StreamComparator;
 import org.apache.solr.client.solrj.io.stream.expr.Expressible;
 import org.apache.solr.client.solrj.io.stream.expr.StreamExpression;
@@ -43,14 +44,14 @@ public class RankStream extends TupleStr
 
   private static final long serialVersionUID = 1;
 
-  private TupleStream tupleStream;
-  private Comparator<Tuple> comp;
+  private TupleStream stream;
+  private StreamComparator comp;
   private int size;
   private transient PriorityQueue<Tuple> top;
   private transient boolean finished = false;
   private transient LinkedList<Tuple> topList;
 
-  public RankStream(TupleStream tupleStream, int size, Comparator<Tuple> comp) {
+  public RankStream(TupleStream tupleStream, int size, StreamComparator comp) throws IOException {
     init(tupleStream,size,comp);
   }
   
@@ -87,15 +88,17 @@ public class RankStream extends TupleStr
     }
     
     TupleStream stream = factory.constructStream(streamExpressions.get(0));
-    Comparator<Tuple> comp = factory.constructComparator(((StreamExpressionValue)sortExpression.getParameter()).getValue(), StreamComparator.class);
+    StreamComparator comp = factory.constructComparator(((StreamExpressionValue)sortExpression.getParameter()).getValue(), FieldComparator.class);
     
     init(stream,nInt,comp);    
   }
   
-  private void init(TupleStream tupleStream, int size, Comparator<Tuple> comp){
-    this.tupleStream = tupleStream;
+  private void init(TupleStream tupleStream, int size, StreamComparator comp) throws IOException{
+    this.stream = tupleStream;
     this.comp = comp;
     this.size = size;
+    
+    // Rank stream does not demand that its order is derivable from the order of the incoming stream. No derivation check required
   }
   
   @Override
@@ -107,52 +110,47 @@ public class RankStream extends TupleStr
     expression.addParameter(new StreamExpressionNamedParameter("n", Integer.toString(size)));
     
     // stream
-    if(tupleStream instanceof Expressible){
-      expression.addParameter(((Expressible)tupleStream).toExpression(factory));
+    if(stream instanceof Expressible){
+      expression.addParameter(((Expressible)stream).toExpression(factory));
     }
     else{
       throw new IOException("This RankStream contains a non-expressible TupleStream - it cannot be converted to an expression");
     }
         
     // sort
-    if(comp instanceof Expressible){
-      expression.addParameter(new StreamExpressionNamedParameter("sort",((Expressible)comp).toExpression(factory)));
-    }
-    else{
-      throw new IOException("This RankStream contains a non-expressible comparator - it cannot be converted to an expression");
-    }
+    expression.addParameter(new StreamExpressionNamedParameter("sort",comp.toExpression(factory)));
     
     return expression;   
   }
   
   public void setStreamContext(StreamContext context) {
-    this.tupleStream.setStreamContext(context);
+    this.stream.setStreamContext(context);
   }
 
   public List<TupleStream> children() {
     List<TupleStream> l =  new ArrayList();
-    l.add(tupleStream);
+    l.add(stream);
     return l;
   }
 
   public void open() throws IOException {
     this.top = new PriorityQueue(size, new ReverseComp(comp));
     this.topList = new LinkedList();
-    tupleStream.open();
+    stream.open();
   }
 
   public void close() throws IOException {
-    tupleStream.close();
+    stream.close();
   }
   
-  public Comparator<Tuple> getComparator(){
+  public StreamComparator getComparator(){
     return this.comp;
   }
 
   public Tuple read() throws IOException {
     if(!finished) {
       while(true) {
-        Tuple tuple = tupleStream.read();
+        Tuple tuple = stream.read();
         if(tuple.EOF) {
           finished = true;
           int s = top.size();
@@ -178,6 +176,11 @@ public class RankStream extends TupleStr
 
     return topList.pollFirst();
   }
+  
+  /** Return the stream sort - ie, the order in which records are returned */
+  public StreamComparator getStreamSort(){
+    return comp;
+  }
 
   public int getCost() {
     return 0;
@@ -185,14 +188,16 @@ public class RankStream extends TupleStr
 
   class ReverseComp implements Comparator<Tuple>, Serializable {
 
-    private Comparator<Tuple> comp;
+    private StreamComparator comp;
 
-    public ReverseComp(Comparator<Tuple> comp) {
+    public ReverseComp(StreamComparator comp) {
       this.comp = comp;
     }
 
     public int compare(Tuple t1, Tuple t2) {
       return comp.compare(t1, t2)*(-1);
     }
+    
+    
   }
 }
\ No newline at end of file

Modified: lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ReducerStream.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ReducerStream.java?rev=1692327&r1=1692326&r2=1692327&view=diff
==============================================================================
--- lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ReducerStream.java (original)
+++ lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ReducerStream.java Wed Jul 22 20:47:59 2015
@@ -19,13 +19,13 @@ package org.apache.solr.client.solrj.io.
 
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Comparator;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Locale;
 import java.util.Map;
 
 import org.apache.solr.client.solrj.io.Tuple;
+import org.apache.solr.client.solrj.io.comp.FieldComparator;
 import org.apache.solr.client.solrj.io.comp.StreamComparator;
 import org.apache.solr.client.solrj.io.stream.expr.Expressible;
 import org.apache.solr.client.solrj.io.stream.expr.StreamExpression;
@@ -52,15 +52,13 @@ public class ReducerStream extends Tuple
 
   private static final long serialVersionUID = 1;
 
-  private PushBackStream tupleStream;
-  private Comparator<Tuple> comp;
+  private PushBackStream stream;
+  private StreamComparator comp;
 
   private transient Tuple currentGroupHead;
 
-  public ReducerStream(TupleStream tupleStream,
-                       Comparator<Tuple> comp) {
-    this.tupleStream = new PushBackStream(tupleStream);
-    this.comp = comp;
+  public ReducerStream(TupleStream stream,StreamComparator comp) throws IOException {
+    init(stream,comp);
   }
   
   public ReducerStream(StreamExpression expression, StreamFactory factory) throws IOException{
@@ -75,15 +73,25 @@ public class ReducerStream extends Tuple
     
     if(1 != streamExpressions.size()){
       throw new IOException(String.format(Locale.ROOT,"Invalid expression %s - expecting a single stream but found %d",expression, streamExpressions.size()));
-    }
-    this.tupleStream = new PushBackStream(factory.constructStream(streamExpressions.get(0)));
-    
+    }    
     if(null == byExpression || !(byExpression.getParameter() instanceof StreamExpressionValue)){
       throw new IOException(String.format(Locale.ROOT,"Invalid expression %s - expecting single 'by' parameter listing fields to group by but didn't find one",expression));
     }
     
     // Reducing is always done over equality, so always use an EqualTo comparator
-    this.comp = factory.constructComparator(((StreamExpressionValue)byExpression.getParameter()).getValue(), StreamComparator.class);
+    
+    init(factory.constructStream(streamExpressions.get(0)),
+         factory.constructComparator(((StreamExpressionValue)byExpression.getParameter()).getValue(), FieldComparator.class)
+        );
+  }
+  
+  private void init(TupleStream stream, StreamComparator comp) throws IOException{
+    this.stream = new PushBackStream(stream);
+    this.comp = comp;
+    
+    if(!comp.isDerivedFrom(stream.getStreamSort())){
+      throw new IOException("Invalid ReducerStream - substream comparator (sort) must be a superset of this stream's comparator.");
+    }
   }
 
   @Override
@@ -92,7 +100,7 @@ public class ReducerStream extends Tuple
     StreamExpression expression = new StreamExpression(factory.getFunctionName(this.getClass()));
     
     // stream
-    expression.addParameter(tupleStream.toExpression(factory));
+    expression.addParameter(stream.toExpression(factory));
     
     // over
     if(comp instanceof Expressible){
@@ -106,32 +114,32 @@ public class ReducerStream extends Tuple
   }
   
   public void setStreamContext(StreamContext context) {
-    this.tupleStream.setStreamContext(context);
+    this.stream.setStreamContext(context);
   }
 
   public List<TupleStream> children() {
     List<TupleStream> l =  new ArrayList();
-    l.add(tupleStream);
+    l.add(stream);
     return l;
   }
 
   public void open() throws IOException {
-    tupleStream.open();
+    stream.open();
   }
 
   public void close() throws IOException {
-    tupleStream.close();
+    stream.close();
   }
 
   public Tuple read() throws IOException {
 
     List<Map> maps = new ArrayList();
     while(true) {
-      Tuple t = tupleStream.read();
+      Tuple t = stream.read();
 
       if(t.EOF) {
        if(maps.size() > 0) {
-         tupleStream.pushBack(t);
+         stream.pushBack(t);
          Map map1 = maps.get(0);
          Map map2 = new HashMap();
          map2.putAll(map1);
@@ -151,7 +159,7 @@ public class ReducerStream extends Tuple
           maps.add(t.getMap());
         } else {
           Tuple groupHead = currentGroupHead.clone();
-          tupleStream.pushBack(t);
+          stream.pushBack(t);
           currentGroupHead = null;
           groupHead.setMaps(maps);
           return groupHead;
@@ -159,6 +167,11 @@ public class ReducerStream extends Tuple
       }
     }
   }
+  
+  /** Return the stream sort - ie, the order in which records are returned */
+  public StreamComparator getStreamSort(){
+    return comp;
+  }
 
   public int getCost() {
     return 0;

Modified: lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/RollupStream.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/RollupStream.java?rev=1692327&r1=1692326&r2=1692327&view=diff
==============================================================================
--- lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/RollupStream.java (original)
+++ lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/RollupStream.java Wed Jul 22 20:47:59 2015
@@ -26,6 +26,7 @@ import java.util.ArrayList;
 
 import org.apache.solr.client.solrj.io.Tuple;
 import org.apache.solr.client.solrj.io.comp.HashKey;
+import org.apache.solr.client.solrj.io.comp.StreamComparator;
 import org.apache.solr.client.solrj.io.stream.metrics.Bucket;
 import org.apache.solr.client.solrj.io.stream.metrics.Metric;
 
@@ -66,6 +67,10 @@ public class RollupStream extends TupleS
     tupleStream.close();
   }
 
+  public StreamComparator getStreamSort(){
+    return tupleStream.getStreamSort();
+  }
+
   public Tuple read() throws IOException {
 
     while(true) {

Modified: lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/SolrStream.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/SolrStream.java?rev=1692327&r1=1692326&r2=1692327&view=diff
==============================================================================
--- lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/SolrStream.java (original)
+++ lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/SolrStream.java Wed Jul 22 20:47:59 2015
@@ -27,6 +27,7 @@ import java.util.Iterator;
 import org.apache.solr.client.solrj.impl.HttpSolrClient;
 import org.apache.solr.client.solrj.io.SolrClientCache;
 import org.apache.solr.client.solrj.io.Tuple;
+import org.apache.solr.client.solrj.io.comp.StreamComparator;
 import org.apache.solr.common.params.ModifiableSolrParams;
 import org.apache.solr.common.params.SolrParams;
 
@@ -163,6 +164,11 @@ public class SolrStream extends TupleStr
       return new Tuple(fields);
     }
   }
+  
+  /** There is no known sort applied to a SolrStream */
+  public StreamComparator getStreamSort(){
+    return null;
+  }
 
   private Map mapFields(Map fields, Map<String,String> mappings) {
 

Modified: lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/TupleStream.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/TupleStream.java?rev=1692327&r1=1692326&r2=1692327&view=diff
==============================================================================
--- lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/TupleStream.java (original)
+++ lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/TupleStream.java Wed Jul 22 20:47:59 2015
@@ -22,6 +22,7 @@ import java.io.Serializable;
 import java.util.List;
 
 import org.apache.solr.client.solrj.io.Tuple;
+import org.apache.solr.client.solrj.io.comp.StreamComparator;
 import org.apache.solr.client.solrj.io.stream.expr.StreamExpression;
 import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
 
@@ -43,7 +44,10 @@ public abstract class TupleStream implem
 
   public abstract Tuple read() throws IOException;
 
+  public abstract StreamComparator getStreamSort();
+  
   public int getCost() {
     return 0;
   }
+  
 }
\ No newline at end of file

Modified: lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/UniqueStream.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/UniqueStream.java?rev=1692327&r1=1692326&r2=1692327&view=diff
==============================================================================
--- lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/UniqueStream.java (original)
+++ lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/UniqueStream.java Wed Jul 22 20:47:59 2015
@@ -24,8 +24,10 @@ import java.util.List;
 import java.util.Locale;
 
 import org.apache.solr.client.solrj.io.Tuple;
+import org.apache.solr.client.solrj.io.comp.FieldComparator;
 import org.apache.solr.client.solrj.io.comp.StreamComparator;
 import org.apache.solr.client.solrj.io.eq.Equalitor;
+import org.apache.solr.client.solrj.io.eq.FieldEqualitor;
 import org.apache.solr.client.solrj.io.eq.StreamEqualitor;
 import org.apache.solr.client.solrj.io.stream.expr.Expressible;
 import org.apache.solr.client.solrj.io.stream.expr.StreamExpression;
@@ -44,13 +46,12 @@ public class UniqueStream extends TupleS
 
   private static final long serialVersionUID = 1;
 
-  private TupleStream tupleStream;
+  private TupleStream stream;
   private Equalitor<Tuple> eq;
   private transient Tuple currentTuple;
 
-  public UniqueStream(TupleStream tupleStream, Equalitor<Tuple> eq) {
-    this.tupleStream = tupleStream;
-    this.eq = eq;
+  public UniqueStream(TupleStream stream, StreamEqualitor eq) throws IOException {
+    init(stream,eq);
   }
   
   public UniqueStream(StreamExpression expression,StreamFactory factory) throws IOException {
@@ -66,14 +67,21 @@ public class UniqueStream extends TupleS
     if(1 != streamExpressions.size()){
       throw new IOException(String.format(Locale.ROOT,"Invalid expression %s - expecting a single stream but found %d",expression, streamExpressions.size()));
     }
-    this.tupleStream = factory.constructStream(streamExpressions.get(0));
     
     if(null == overExpression || !(overExpression.getParameter() instanceof StreamExpressionValue)){
       throw new IOException(String.format(Locale.ROOT,"Invalid expression %s - expecting single 'over' parameter listing fields to unique over but didn't find one",expression));
     }
     
-    // Uniqueness is always done over equality, so always use an EqualTo comparator
-    this.eq = factory.constructEqualitor(((StreamExpressionValue)overExpression.getParameter()).getValue(), StreamEqualitor.class);
+    init(factory.constructStream(streamExpressions.get(0)), factory.constructEqualitor(((StreamExpressionValue)overExpression.getParameter()).getValue(), FieldEqualitor.class));
+  }
+  
+  private void init(TupleStream stream, StreamEqualitor eq) throws IOException{
+    this.stream = stream;
+    this.eq = eq;
+
+    if(!eq.isDerivedFrom(stream.getStreamSort())){
+      throw new IOException("Invalid UniqueStream - substream comparator (sort) must be a superset of this stream's equalitor.");
+    }    
   }
 
   @Override
@@ -82,8 +90,8 @@ public class UniqueStream extends TupleS
     StreamExpression expression = new StreamExpression(factory.getFunctionName(this.getClass()));
     
     // streams
-    if(tupleStream instanceof Expressible){
-      expression.addParameter(((Expressible)tupleStream).toExpression(factory));
+    if(stream instanceof Expressible){
+      expression.addParameter(((Expressible)stream).toExpression(factory));
     }
     else{
       throw new IOException("This UniqueStream contains a non-expressible TupleStream - it cannot be converted to an expression");
@@ -101,25 +109,25 @@ public class UniqueStream extends TupleS
   }
     
   public void setStreamContext(StreamContext context) {
-    this.tupleStream.setStreamContext(context);
+    this.stream.setStreamContext(context);
   }
 
   public List<TupleStream> children() {
     List<TupleStream> l =  new ArrayList<TupleStream>();
-    l.add(tupleStream);
+    l.add(stream);
     return l;
   }
 
   public void open() throws IOException {
-    tupleStream.open();
+    stream.open();
   }
 
   public void close() throws IOException {
-    tupleStream.close();
+    stream.close();
   }
 
   public Tuple read() throws IOException {
-    Tuple tuple = tupleStream.read();
+    Tuple tuple = stream.read();
     if(tuple.EOF) {
       return tuple;
     }
@@ -131,7 +139,7 @@ public class UniqueStream extends TupleS
       while(true) {
         if(eq.test(currentTuple, tuple)){
           //We have duplicate tuple so read the next tuple from the stream.
-          tuple = tupleStream.read();
+          tuple = stream.read();
           if(tuple.EOF) {
             return tuple;
           }
@@ -144,6 +152,11 @@ public class UniqueStream extends TupleS
     }
   }
 
+  /** Return the stream sort - ie, the order in which records are returned */
+  public StreamComparator getStreamSort(){
+    return stream.getStreamSort();
+  }
+  
   public int getCost() {
     return 0;
   }

Modified: lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/expr/StreamFactory.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/expr/StreamFactory.java?rev=1692327&r1=1692326&r2=1692327&view=diff
==============================================================================
--- lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/expr/StreamFactory.java (original)
+++ lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/expr/StreamFactory.java Wed Jul 22 20:47:59 2015
@@ -5,7 +5,6 @@ import java.io.Serializable;
 import java.lang.reflect.Constructor;
 import java.lang.reflect.InvocationTargetException;
 import java.util.ArrayList;
-import java.util.Comparator;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Locale;
@@ -14,9 +13,11 @@ import java.util.Map.Entry;
 
 import org.apache.solr.client.solrj.io.Tuple;
 import org.apache.solr.client.solrj.io.comp.ComparatorOrder;
-import org.apache.solr.client.solrj.io.comp.MultiComp;
+import org.apache.solr.client.solrj.io.comp.MultipleFieldComparator;
+import org.apache.solr.client.solrj.io.comp.StreamComparator;
 import org.apache.solr.client.solrj.io.eq.Equalitor;
-import org.apache.solr.client.solrj.io.eq.MultiEqualitor;
+import org.apache.solr.client.solrj.io.eq.MultipleFieldEqualitor;
+import org.apache.solr.client.solrj.io.eq.StreamEqualitor;
 import org.apache.solr.client.solrj.io.stream.TupleStream;
 
 /*
@@ -179,14 +180,14 @@ public class StreamFactory implements Se
     throw new IOException(String.format(Locale.ROOT,"Invalid stream expression %s - function '%s' is unknown (not mapped to a valid TupleStream)", expression, expression.getFunctionName()));
   }
 
-  public Comparator<Tuple> constructComparator(String comparatorString, Class comparatorType) throws IOException {
+  public StreamComparator constructComparator(String comparatorString, Class comparatorType) throws IOException {
     if(comparatorString.contains(",")){
       String[] parts = comparatorString.split(",");
-      Comparator[] comps = new Comparator[parts.length];
+      StreamComparator[] comps = new StreamComparator[parts.length];
       for(int idx = 0; idx < parts.length; ++idx){
         comps[idx] = constructComparator(parts[idx].trim(), comparatorType);
       }
-      return new MultiComp(comps);
+      return new MultipleFieldComparator(comps);
     }
     else{
       String[] parts = comparatorString.split(" ");
@@ -197,18 +198,18 @@ public class StreamFactory implements Se
       String fieldName = parts[0].trim();
       String order = parts[1].trim();
       
-      return (Comparator)createInstance(comparatorType, new Class[]{ String.class, ComparatorOrder.class }, new Object[]{ fieldName, ComparatorOrder.fromString(order) });
+      return (StreamComparator)createInstance(comparatorType, new Class[]{ String.class, ComparatorOrder.class }, new Object[]{ fieldName, ComparatorOrder.fromString(order) });
     }
   }
     
-  public Equalitor<Tuple> constructEqualitor(String equalitorString, Class equalitorType) throws IOException {
+  public StreamEqualitor constructEqualitor(String equalitorString, Class equalitorType) throws IOException {
     if(equalitorString.contains(",")){
       String[] parts = equalitorString.split(",");
-      Equalitor[] eqs = new Equalitor[parts.length];
+      StreamEqualitor[] eqs = new StreamEqualitor[parts.length];
       for(int idx = 0; idx < parts.length; ++idx){
         eqs[idx] = constructEqualitor(parts[idx].trim(), equalitorType);
       }
-      return new MultiEqualitor(eqs);
+      return new MultipleFieldEqualitor(eqs);
     }
     else{
       String leftFieldName;
@@ -227,7 +228,7 @@ public class StreamFactory implements Se
         leftFieldName = rightFieldName = equalitorString.trim();
       }
       
-      return (Equalitor)createInstance(equalitorType, new Class[]{ String.class, String.class }, new Object[]{ leftFieldName, rightFieldName });
+      return (StreamEqualitor)createInstance(equalitorType, new Class[]{ String.class, String.class }, new Object[]{ leftFieldName, rightFieldName });
     }
   }
 

Modified: lucene/dev/branches/branch_5x/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/CountStream.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_5x/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/CountStream.java?rev=1692327&r1=1692326&r2=1692327&view=diff
==============================================================================
--- lucene/dev/branches/branch_5x/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/CountStream.java (original)
+++ lucene/dev/branches/branch_5x/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/CountStream.java Wed Jul 22 20:47:59 2015
@@ -23,6 +23,7 @@ import java.util.List;
 import java.util.Locale;
 
 import org.apache.solr.client.solrj.io.Tuple;
+import org.apache.solr.client.solrj.io.comp.StreamComparator;
 import org.apache.solr.client.solrj.io.stream.expr.Expressible;
 import org.apache.solr.client.solrj.io.stream.expr.StreamExpression;
 import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionParameter;
@@ -96,4 +97,9 @@ public class CountStream extends TupleSt
       return t;
     }
   }
+
+  @Override
+  public StreamComparator getStreamSort() {
+    return null;
+  }
 }
\ No newline at end of file

Modified: lucene/dev/branches/branch_5x/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_5x/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java?rev=1692327&r1=1692326&r2=1692327&view=diff
==============================================================================
--- lucene/dev/branches/branch_5x/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java (original)
+++ lucene/dev/branches/branch_5x/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java Wed Jul 22 20:47:59 2015
@@ -601,7 +601,12 @@ public class StreamExpressionTest extend
         .withStreamFunction("group", ReducerStream.class)
         .withStreamFunction("parallel", ParallelStream.class);
 
-    ParallelStream pstream = (ParallelStream)streamFactory.constructStream("parallel(collection1, top(search(collection1, q=\"*:*\", fl=\"id,a_s,a_i\", sort=\"a_i asc\", partitionKeys=\"a_i\"), n=\"11\", sort=\"a_i desc\"), workers=\"2\", zkHost=\""+zkHost+"\", sort=\"a_i desc\")");
+    ParallelStream pstream = (ParallelStream)streamFactory.constructStream("parallel("
+        + "collection1, "
+        + "top("
+          + "search(collection1, q=\"*:*\", fl=\"id,a_s,a_i\", sort=\"a_i asc\", partitionKeys=\"a_i\"), "
+          + "n=\"11\", "
+          + "sort=\"a_i desc\"), workers=\"2\", zkHost=\""+zkHost+"\", sort=\"a_i desc\")");
 
     List<Tuple> tuples = getTuples(pstream);
 

Modified: lucene/dev/branches/branch_5x/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionToExpessionTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_5x/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionToExpessionTest.java?rev=1692327&r1=1692326&r2=1692327&view=diff
==============================================================================
--- lucene/dev/branches/branch_5x/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionToExpessionTest.java (original)
+++ lucene/dev/branches/branch_5x/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionToExpessionTest.java Wed Jul 22 20:47:59 2015
@@ -78,10 +78,10 @@ public class StreamExpressionToExpession
     String expressionString;
     
     // Basic test
-    stream = new UniqueStream(StreamExpressionParser.parse("unique(search(collection1, q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_i asc\"), over=\"a_f asc\")"), factory);
+    stream = new UniqueStream(StreamExpressionParser.parse("unique(search(collection1, q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_i asc\"), over=\"a_f\")"), factory);
     expressionString = stream.toExpression(factory).toString();
     assertTrue(expressionString.contains("unique(search(collection1"));
-    assertTrue(expressionString.contains("over=\"a_f asc\""));
+    assertTrue(expressionString.contains("over=a_f"));
   }
   
   @Test

Modified: lucene/dev/branches/branch_5x/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamingTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_5x/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamingTest.java?rev=1692327&r1=1692326&r2=1692327&view=diff
==============================================================================
--- lucene/dev/branches/branch_5x/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamingTest.java (original)
+++ lucene/dev/branches/branch_5x/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamingTest.java Wed Jul 22 20:47:59 2015
@@ -29,9 +29,9 @@ import org.apache.lucene.util.LuceneTest
 import org.apache.lucene.util.LuceneTestCase.Slow;
 import org.apache.solr.client.solrj.io.Tuple;
 import org.apache.solr.client.solrj.io.comp.ComparatorOrder;
-import org.apache.solr.client.solrj.io.comp.MultiComp;
-import org.apache.solr.client.solrj.io.comp.StreamComparator;
-import org.apache.solr.client.solrj.io.eq.StreamEqualitor;
+import org.apache.solr.client.solrj.io.comp.MultipleFieldComparator;
+import org.apache.solr.client.solrj.io.comp.FieldComparator;
+import org.apache.solr.client.solrj.io.eq.FieldEqualitor;
 import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
 import org.apache.solr.client.solrj.io.stream.metrics.Bucket;
 import org.apache.solr.client.solrj.io.stream.metrics.CountMetric;
@@ -139,11 +139,23 @@ public class StreamingTest extends Abstr
 
     Map params = mapParams("q","*:*","fl","id,a_s,a_i,a_f","sort", "a_f asc,a_i asc");
     CloudSolrStream stream = new CloudSolrStream(zkHost, "collection1", params);
-    UniqueStream ustream = new UniqueStream(stream, new StreamEqualitor("a_f"));
+    UniqueStream ustream = new UniqueStream(stream, new FieldEqualitor("a_f"));
     List<Tuple> tuples = getTuples(ustream);
     assert(tuples.size() == 4);
     assertOrder(tuples, 0,1,3,4);
 
+
+    try {
+      params = mapParams("q","*:*","fl","id,a_s,a_i,a_f","sort", "a_f asc,a_i asc");
+      stream = new CloudSolrStream(zkHost, "collection1", params);
+      ustream = new UniqueStream(stream, new FieldEqualitor("a_i"));
+      throw new Exception("Equalitors did not match but no excepion was thrown");
+    } catch(Exception e) {
+      if(!e.getMessage().equals("Invalid UniqueStream - substream comparator (sort) must be a superset of this stream's equalitor.")) {
+        throw e;
+      }
+    }
+
     del("*:*");
     commit();
 
@@ -188,7 +200,7 @@ public class StreamingTest extends Abstr
 
     Map paramsA = mapParams("q", "*:*", "fl", "id,a_s,a_i,a_f", "sort", "a_s asc,a_f asc", "partitionKeys", "none");
     CloudSolrStream stream = new CloudSolrStream(zkHost, "collection1", paramsA);
-    ParallelStream pstream = new ParallelStream(zkHost, "collection1", stream, 2, new StreamComparator("a_s",ComparatorOrder.ASCENDING));
+    ParallelStream pstream = new ParallelStream(zkHost, "collection1", stream, 2, new FieldComparator("a_s",ComparatorOrder.ASCENDING));
 
     List<Tuple> tuples = getTuples(pstream);
 
@@ -221,8 +233,8 @@ public class StreamingTest extends Abstr
 
     Map params = mapParams("q","*:*","fl","id,a_s,a_i,a_f","sort", "a_f asc,a_i asc", "partitionKeys", "a_f");
     CloudSolrStream stream = new CloudSolrStream(zkHost, "collection1", params);
-    UniqueStream ustream = new UniqueStream(stream, new StreamEqualitor("a_f"));
-    ParallelStream pstream = new ParallelStream(zkHost, "collection1", ustream, 2, new StreamComparator("a_f",ComparatorOrder.ASCENDING));
+    UniqueStream ustream = new UniqueStream(stream, new FieldEqualitor("a_f"));
+    ParallelStream pstream = new ParallelStream(zkHost, "collection1", ustream, 2, new FieldComparator("a_f",ComparatorOrder.ASCENDING));
     List<Tuple> tuples = getTuples(pstream);
     assert(tuples.size() == 5);
     assertOrder(tuples, 0,1,3,4,6);
@@ -255,7 +267,7 @@ public class StreamingTest extends Abstr
 
     Map params = mapParams("q", "*:*", "fl", "id,a_s,a_i", "sort", "a_i asc");
     CloudSolrStream stream = new CloudSolrStream(zkHost, "collection1", params);
-    RankStream rstream = new RankStream(stream, 3, new StreamComparator("a_i",ComparatorOrder.DESCENDING));
+    RankStream rstream = new RankStream(stream, 3, new FieldComparator("a_i",ComparatorOrder.DESCENDING));
     List<Tuple> tuples = getTuples(rstream);
 
 
@@ -287,8 +299,8 @@ public class StreamingTest extends Abstr
 
     Map params = mapParams("q", "*:*", "fl", "id,a_s,a_i", "sort", "a_i asc", "partitionKeys", "a_i");
     CloudSolrStream stream = new CloudSolrStream(zkHost, "collection1", params);
-    RankStream rstream = new RankStream(stream, 11, new StreamComparator("a_i",ComparatorOrder.DESCENDING));
-    ParallelStream pstream = new ParallelStream(zkHost, "collection1", rstream, 2, new StreamComparator("a_i",ComparatorOrder.DESCENDING));
+    RankStream rstream = new RankStream(stream, 11, new FieldComparator("a_i",ComparatorOrder.DESCENDING));
+    ParallelStream pstream = new ParallelStream(zkHost, "collection1", rstream, 2, new FieldComparator("a_i",ComparatorOrder.DESCENDING));
     List<Tuple> tuples = getTuples(pstream);
 
     assert(tuples.size() == 10);
@@ -354,7 +366,7 @@ public class StreamingTest extends Abstr
     //Test with spaces in the parameter lists.
     Map paramsA = mapParams("q","*:*","fl","id,a_s, a_i,  a_f","sort", "a_s asc  ,  a_f   asc");
     CloudSolrStream stream = new CloudSolrStream(zkHost, "collection1", paramsA);
-    ReducerStream rstream = new ReducerStream(stream, new StreamComparator("a_s",ComparatorOrder.ASCENDING));
+    ReducerStream rstream = new ReducerStream(stream, new FieldComparator("a_s",ComparatorOrder.ASCENDING));
 
     List<Tuple> tuples = getTuples(rstream);
 
@@ -373,6 +385,18 @@ public class StreamingTest extends Abstr
     List<Map> maps2 = t2.getMaps();
     assertMaps(maps2, 4, 6);
 
+    try {
+
+      paramsA = mapParams("q","*:*","fl","id,a_s, a_i,  a_f","sort", "a_i asc  ,  a_f   asc");
+      stream = new CloudSolrStream(zkHost, "collection1", paramsA);
+      rstream = new ReducerStream(stream, new FieldComparator("a_s",ComparatorOrder.ASCENDING));
+      throw new Exception("Sorts did not match up and Exception was not not thrown.");
+    } catch (Exception e) {
+      if(!e.getMessage().equals("Invalid ReducerStream - substream comparator (sort) must be a superset of this stream's comparator.")) {
+        throw e;
+      }
+    }
+
 
 
     del("*:*");
@@ -401,7 +425,7 @@ public class StreamingTest extends Abstr
     //Test with spaces in the parameter lists.
     Map paramsA = mapParams("q", "blah", "fl", "id,a_s, a_i,  a_f", "sort", "a_s asc  ,  a_f   asc");
     CloudSolrStream stream = new CloudSolrStream(zkHost, "collection1", paramsA);
-    ReducerStream rstream = new ReducerStream(stream, new StreamComparator("a_s", ComparatorOrder.ASCENDING));
+    ReducerStream rstream = new ReducerStream(stream, new FieldComparator("a_s", ComparatorOrder.ASCENDING));
 
     List<Tuple> tuples = getTuples(rstream);
 
@@ -432,8 +456,8 @@ public class StreamingTest extends Abstr
 
     Map paramsA = mapParams("q","*:*","fl","id,a_s,a_i,a_f","sort", "a_s asc,a_f asc", "partitionKeys", "a_s");
     CloudSolrStream stream = new CloudSolrStream(zkHost, "collection1", paramsA);
-    ReducerStream rstream = new ReducerStream(stream, new StreamComparator("a_s",ComparatorOrder.ASCENDING));
-    ParallelStream pstream = new ParallelStream(zkHost, "collection1", rstream, 2, new StreamComparator("a_s",ComparatorOrder.ASCENDING));
+    ReducerStream rstream = new ReducerStream(stream, new FieldComparator("a_s",ComparatorOrder.ASCENDING));
+    ParallelStream pstream = new ParallelStream(zkHost, "collection1", rstream, 2, new FieldComparator("a_s",ComparatorOrder.ASCENDING));
 
     List<Tuple> tuples = getTuples(pstream);
 
@@ -456,8 +480,8 @@ public class StreamingTest extends Abstr
 
     paramsA = mapParams("q","*:*","fl","id,a_s,a_i,a_f","sort", "a_s desc,a_f asc", "partitionKeys", "a_s");
     stream = new CloudSolrStream(zkHost, "collection1", paramsA);
-    rstream = new ReducerStream(stream, new StreamComparator("a_s",ComparatorOrder.DESCENDING));
-    pstream = new ParallelStream(zkHost, "collection1", rstream, 2, new StreamComparator("a_s",ComparatorOrder.DESCENDING));
+    rstream = new ReducerStream(stream, new FieldComparator("a_s",ComparatorOrder.DESCENDING));
+    pstream = new ParallelStream(zkHost, "collection1", rstream, 2, new FieldComparator("a_s",ComparatorOrder.DESCENDING));
 
     tuples = getTuples(pstream);
 
@@ -639,7 +663,7 @@ public class StreamingTest extends Abstr
                         new CountMetric()};
 
     RollupStream rollupStream = new RollupStream(stream, buckets, metrics);
-    ParallelStream parallelStream = new ParallelStream(zkHost, "collection1", rollupStream, 2, new StreamComparator("a_s", ComparatorOrder.ASCENDING));
+    ParallelStream parallelStream = new ParallelStream(zkHost, "collection1", rollupStream, 2, new FieldComparator("a_s", ComparatorOrder.ASCENDING));
     List<Tuple> tuples = getTuples(parallelStream);
 
     assert(tuples.size() == 3);
@@ -739,8 +763,8 @@ public class StreamingTest extends Abstr
 
     Map paramsA = mapParams("q","blah","fl","id,a_s,a_i,a_f","sort", "a_s asc,a_f asc", "partitionKeys", "a_s");
     CloudSolrStream stream = new CloudSolrStream(zkHost, "collection1", paramsA);
-    ReducerStream rstream = new ReducerStream(stream, new StreamComparator("a_s", ComparatorOrder.ASCENDING));
-    ParallelStream pstream = new ParallelStream(zkHost, "collection1", rstream, 2, new StreamComparator("a_s", ComparatorOrder.ASCENDING));
+    ReducerStream rstream = new ReducerStream(stream, new FieldComparator("a_s", ComparatorOrder.ASCENDING));
+    ParallelStream pstream = new ParallelStream(zkHost, "collection1", rstream, 2, new FieldComparator("a_s", ComparatorOrder.ASCENDING));
 
     List<Tuple> tuples = getTuples(pstream);
     assert(tuples.size() == 0);
@@ -809,7 +833,7 @@ public class StreamingTest extends Abstr
     Map paramsB = mapParams("q","id:(0 2 3)","fl","id,a_s,a_i","sort", "a_i asc");
     CloudSolrStream streamB = new CloudSolrStream(zkHost, "collection1", paramsB);
 
-    MergeStream mstream = new MergeStream(streamA, streamB, new StreamComparator("a_i",ComparatorOrder.ASCENDING));
+    MergeStream mstream = new MergeStream(streamA, streamB, new FieldComparator("a_i",ComparatorOrder.ASCENDING));
     List<Tuple> tuples = getTuples(mstream);
 
     assert(tuples.size() == 5);
@@ -822,7 +846,7 @@ public class StreamingTest extends Abstr
     paramsB = mapParams("q","id:(0 2 3)","fl","id,a_s,a_i","sort", "a_i desc");
     streamB = new CloudSolrStream(zkHost, "collection1", paramsB);
 
-    mstream = new MergeStream(streamA, streamB, new StreamComparator("a_i",ComparatorOrder.DESCENDING));
+    mstream = new MergeStream(streamA, streamB, new FieldComparator("a_i",ComparatorOrder.DESCENDING));
     tuples = getTuples(mstream);
 
     assert(tuples.size() == 5);
@@ -836,7 +860,7 @@ public class StreamingTest extends Abstr
     paramsB = mapParams("q","id:(0 3)","fl","id,a_s,a_i,a_f","sort", "a_f asc,a_i asc");
     streamB = new CloudSolrStream(zkHost, "collection1", paramsB);
 
-    mstream = new MergeStream(streamA, streamB, new MultiComp(new StreamComparator("a_f",ComparatorOrder.ASCENDING),new StreamComparator("a_i",ComparatorOrder.ASCENDING)));
+    mstream = new MergeStream(streamA, streamB, new MultipleFieldComparator(new FieldComparator("a_f",ComparatorOrder.ASCENDING),new FieldComparator("a_i",ComparatorOrder.ASCENDING)));
     tuples = getTuples(mstream);
 
     assert(tuples.size() == 5);
@@ -848,12 +872,41 @@ public class StreamingTest extends Abstr
     paramsB = mapParams("q","id:(0 3)","fl","id,a_s,a_i,a_f","sort", "a_f asc,a_i desc");
     streamB = new CloudSolrStream(zkHost, "collection1", paramsB);
 
-    mstream = new MergeStream(streamA, streamB, new MultiComp(new StreamComparator("a_f",ComparatorOrder.ASCENDING),new StreamComparator("a_i",ComparatorOrder.DESCENDING)));
+    mstream = new MergeStream(streamA, streamB, new MultipleFieldComparator(new FieldComparator("a_f",ComparatorOrder.ASCENDING),new FieldComparator("a_i",ComparatorOrder.DESCENDING)));
     tuples = getTuples(mstream);
 
     assert(tuples.size() == 5);
     assertOrder(tuples, 2,0,1,3,4);
 
+    try {
+      paramsA = mapParams("q","id:(2 4 1)","fl","id,a_s,a_i,a_f","sort", "a_f desc,a_i desc");
+      streamA = new CloudSolrStream(zkHost, "collection1", paramsA);
+
+      paramsB = mapParams("q","id:(0 3)","fl","id,a_s,a_i,a_f","sort", "a_f asc,a_i desc");
+      streamB = new CloudSolrStream(zkHost, "collection1", paramsB);
+      mstream = new MergeStream(streamA, streamB, new MultipleFieldComparator(new FieldComparator("a_f",ComparatorOrder.ASCENDING),new FieldComparator("a_i",ComparatorOrder.DESCENDING)));
+      throw new Exception("Sorts did not match up and Exception was not not thrown.");
+    } catch(Exception e) {
+      if(!e.getMessage().equals("Invalid MergeStream - both substream comparators (sort) must be a superset of this stream's comparator.")) {
+        throw e;
+      }
+    }
+
+    try {
+      paramsA = mapParams("q","id:(2 4 1)","fl","id,a_s,a_i,a_f","sort", "a_f asc,a_i desc");
+      streamA = new CloudSolrStream(zkHost, "collection1", paramsA);
+
+      paramsB = mapParams("q","id:(0 3)","fl","id,a_s,a_i,a_f","sort", "a_f asc,a_i asc");
+      streamB = new CloudSolrStream(zkHost, "collection1", paramsB);
+      mstream = new MergeStream(streamA, streamB, new MultipleFieldComparator(new FieldComparator("a_f",ComparatorOrder.ASCENDING),new FieldComparator("a_i",ComparatorOrder.DESCENDING)));
+      throw new Exception("Sorts did not match up and Exception was not not thrown.");
+    } catch(Exception e) {
+      if(!e.getMessage().equals("Invalid MergeStream - both substream comparators (sort) must be a superset of this stream's comparator.")) {
+        throw e;
+      }
+    }
+
+
     del("*:*");
     commit();
   }
@@ -884,8 +937,8 @@ public class StreamingTest extends Abstr
     Map paramsB = mapParams("q","id:(0 2 3 6)","fl","id,a_s,a_i","sort", "a_i asc", "partitionKeys", "a_i");
     CloudSolrStream streamB = new CloudSolrStream(zkHost, "collection1", paramsB);
 
-    MergeStream mstream = new MergeStream(streamA, streamB, new StreamComparator("a_i",ComparatorOrder.ASCENDING));
-    ParallelStream pstream = new ParallelStream(zkHost, "collection1", mstream, 2, new StreamComparator("a_i",ComparatorOrder.ASCENDING));
+    MergeStream mstream = new MergeStream(streamA, streamB, new FieldComparator("a_i",ComparatorOrder.ASCENDING));
+    ParallelStream pstream = new ParallelStream(zkHost, "collection1", mstream, 2, new FieldComparator("a_i",ComparatorOrder.ASCENDING));
     List<Tuple> tuples = getTuples(pstream);
 
     assert(tuples.size() == 9);
@@ -898,8 +951,8 @@ public class StreamingTest extends Abstr
     paramsB = mapParams("q","id:(0 2 3 6)","fl","id,a_s,a_i","sort", "a_i desc", "partitionKeys", "a_i");
     streamB = new CloudSolrStream(zkHost, "collection1", paramsB);
 
-    mstream = new MergeStream(streamA, streamB, new StreamComparator("a_i",ComparatorOrder.DESCENDING));
-    pstream = new ParallelStream(zkHost, "collection1", mstream, 2, new StreamComparator("a_i",ComparatorOrder.DESCENDING));
+    mstream = new MergeStream(streamA, streamB, new FieldComparator("a_i",ComparatorOrder.DESCENDING));
+    pstream = new ParallelStream(zkHost, "collection1", mstream, 2, new FieldComparator("a_i",ComparatorOrder.DESCENDING));
     tuples = getTuples(pstream);
 
     assert(tuples.size() == 8);
@@ -934,9 +987,9 @@ public class StreamingTest extends Abstr
     Map paramsB = mapParams("q","id:(0 2 3 6)","fl","id,a_s,a_i","sort", "a_i asc", "partitionKeys", "a_i");
     CloudSolrStream streamB = new CloudSolrStream(zkHost, "collection1", paramsB);
 
-    MergeStream mstream = new MergeStream(streamA, streamB, new StreamComparator("a_i",ComparatorOrder.ASCENDING));
+    MergeStream mstream = new MergeStream(streamA, streamB, new FieldComparator("a_i",ComparatorOrder.ASCENDING));
     CountStream cstream = new CountStream(mstream);
-    ParallelStream pstream = new ParallelStream(zkHost, "collection1", cstream, 2, new StreamComparator("a_i",ComparatorOrder.ASCENDING));
+    ParallelStream pstream = new ParallelStream(zkHost, "collection1", cstream, 2, new FieldComparator("a_i",ComparatorOrder.ASCENDING));
     List<Tuple> tuples = getTuples(pstream);
 
     assert(tuples.size() == 9);