You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by jb...@apache.org on 2015/07/22 23:54:56 UTC
svn commit: r1692329 - in /lucene/dev/branches/branch_5x: ./ solr/
solr/core/ solr/core/src/java/org/apache/solr/handler/ solr/solrj/
solr/solrj/src/java/org/apache/solr/client/solrj/io/comp/
solr/solrj/src/java/org/apache/solr/client/solrj/io/eq/ solr...
Author: jbernste
Date: Wed Jul 22 21:54:55 2015
New Revision: 1692329
URL: http://svn.apache.org/r1692329
Log:
SOLR-7554: Reverting 5x commit
Added:
lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/io/comp/MultiComp.java
- copied unchanged from r1692326, lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/io/comp/MultiComp.java
lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/io/eq/MultiEqualitor.java
- copied unchanged from r1692326, lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/io/eq/MultiEqualitor.java
Removed:
lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/io/comp/FieldComparator.java
lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/io/comp/MultipleFieldComparator.java
lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/io/eq/FieldEqualitor.java
lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/io/eq/MultipleFieldEqualitor.java
Modified:
lucene/dev/branches/branch_5x/ (props changed)
lucene/dev/branches/branch_5x/solr/ (props changed)
lucene/dev/branches/branch_5x/solr/core/ (props changed)
lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/handler/SQLHandler.java
lucene/dev/branches/branch_5x/solr/solrj/ (props changed)
lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/io/comp/StreamComparator.java
lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/io/eq/StreamEqualitor.java
lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/CloudSolrStream.java
lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/MergeStream.java
lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ParallelStream.java
lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/PushBackStream.java
lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/RankStream.java
lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ReducerStream.java
lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/RollupStream.java
lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/SolrStream.java
lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/TupleStream.java
lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/UniqueStream.java
lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/expr/StreamFactory.java
lucene/dev/branches/branch_5x/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/CountStream.java
lucene/dev/branches/branch_5x/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java
lucene/dev/branches/branch_5x/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionToExpessionTest.java
lucene/dev/branches/branch_5x/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamingTest.java
Modified: lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/handler/SQLHandler.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/handler/SQLHandler.java?rev=1692329&r1=1692328&r2=1692329&view=diff
==============================================================================
--- lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/handler/SQLHandler.java (original)
+++ lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/handler/SQLHandler.java Wed Jul 22 21:54:55 2015
@@ -32,9 +32,8 @@ import com.google.common.base.Strings;
import com.google.common.collect.Iterables;
import org.apache.solr.client.solrj.io.Tuple;
import org.apache.solr.client.solrj.io.comp.ComparatorOrder;
-import org.apache.solr.client.solrj.io.comp.FieldComparator;
-import org.apache.solr.client.solrj.io.comp.MultipleFieldComparator;
import org.apache.solr.client.solrj.io.comp.StreamComparator;
+import org.apache.solr.client.solrj.io.comp.MultiComp;
import org.apache.solr.client.solrj.io.stream.CloudSolrStream;
import org.apache.solr.client.solrj.io.stream.ParallelStream;
import org.apache.solr.client.solrj.io.stream.RankStream;
@@ -172,7 +171,7 @@ public class SQLHandler extends RequestH
if(numWorkers > 1) {
// Do the rollups in parallel
// Maintain the sort of the Tuples coming from the workers.
- StreamComparator comp = bucketSortComp(buckets, sortDirection);
+ Comparator<Tuple> comp = bucketSortComp(buckets, sortDirection);
tupleStream = new ParallelStream(workerZkHost, workerCollection, tupleStream, numWorkers, comp);
}
@@ -186,7 +185,7 @@ public class SQLHandler extends RequestH
if(sqlVisitor.sorts != null && sqlVisitor.sorts.size() > 0) {
if(!sortsEqual(buckets, sortDirection, sqlVisitor.sorts)) {
int limit = sqlVisitor.limit == -1 ? 100 : sqlVisitor.limit;
- StreamComparator comp = getComp(sqlVisitor.sorts);
+ Comparator<Tuple> comp = getComp(sqlVisitor.sorts);
//Rank the Tuples
//If parallel stream is used ALL the Rolled up tuples from the workers will be ranked
//Providing a true Top or Bottom.
@@ -312,35 +311,35 @@ public class SQLHandler extends RequestH
return "asc";
}
- private static StreamComparator bucketSortComp(Bucket[] buckets, String dir) {
- FieldComparator[] comps = new FieldComparator[buckets.length];
+ private static Comparator<Tuple> bucketSortComp(Bucket[] buckets, String dir) {
+ Comparator<Tuple>[] comps = new Comparator[buckets.length];
for(int i=0; i<buckets.length; i++) {
ComparatorOrder comparatorOrder = ascDescComp(dir);
String sortKey = buckets[i].toString();
- comps[i] = new FieldComparator(stripQuotes(sortKey), comparatorOrder);
+ comps[i] = new StreamComparator(stripQuotes(sortKey), comparatorOrder);
}
if(comps.length == 1) {
return comps[0];
} else {
- return new MultipleFieldComparator(comps);
+ return new MultiComp(comps);
}
}
- private static StreamComparator getComp(List<SortItem> sortItems) {
- FieldComparator[] comps = new FieldComparator[sortItems.size()];
+ private static Comparator<Tuple> getComp(List<SortItem> sortItems) {
+ Comparator<Tuple>[] comps = new Comparator[sortItems.size()];
for(int i=0; i<sortItems.size(); i++) {
SortItem sortItem = sortItems.get(i);
String ordering = sortItem.getOrdering().toString();
ComparatorOrder comparatorOrder = ascDescComp(ordering);
String sortKey = sortItem.getSortKey().toString();
- comps[i] = new FieldComparator(stripQuotes(sortKey), comparatorOrder);
+ comps[i] = new StreamComparator(stripQuotes(sortKey), comparatorOrder);
}
if(comps.length == 1) {
return comps[0];
} else {
- return new MultipleFieldComparator(comps);
+ return new MultiComp(comps);
}
}
@@ -672,10 +671,6 @@ public class SQLHandler extends RequestH
return children;
}
- public StreamComparator getStreamSort(){
- return stream.getStreamSort();
- }
-
public void setStreamContext(StreamContext context) {
stream.setStreamContext(context);
}
@@ -713,10 +708,6 @@ public class SQLHandler extends RequestH
this.stream.close();
}
- public StreamComparator getStreamSort(){
- return stream.getStreamSort();
- }
-
public List<TupleStream> children() {
List<TupleStream> children = new ArrayList();
children.add(stream);
Modified: lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/io/comp/StreamComparator.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/io/comp/StreamComparator.java?rev=1692329&r1=1692328&r2=1692329&view=diff
==============================================================================
--- lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/io/comp/StreamComparator.java (original)
+++ lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/io/comp/StreamComparator.java Wed Jul 22 21:54:55 2015
@@ -26,7 +26,92 @@ import org.apache.solr.client.solrj.io.s
import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionValue;
import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
-/** Defines a comparator we can use with TupleStreams */
-public interface StreamComparator extends Comparator<Tuple>, Expressible, Serializable {
- public boolean isDerivedFrom(StreamComparator base);
+/**
+ * An equality field Comparator which compares a field of two Tuples and determines sort order.
+ **/
+public class StreamComparator implements Comparator<Tuple>, Expressible, Serializable {
+
+ private static final long serialVersionUID = 1;
+
+ private String leftField;
+ private String rightField;
+ private final ComparatorOrder order;
+ private ComparatorLambda comparator;
+
+ public StreamComparator(String field, ComparatorOrder order) {
+ this.leftField = field;
+ this.rightField = field;
+ this.order = order;
+ assignComparator();
+ }
+ public StreamComparator(String leftField, String rightField, ComparatorOrder order){
+ this.leftField = leftField;
+ this.rightField = rightField;
+ this.order = order;
+ assignComparator();
+ }
+
+ public StreamExpressionParameter toExpression(StreamFactory factory){
+ StringBuilder sb = new StringBuilder();
+
+ sb.append(leftField);
+
+ if(!leftField.equals(rightField)){
+ sb.append("=");
+ sb.append(rightField);
+ }
+
+ sb.append(" ");
+ sb.append(order);
+
+ return new StreamExpressionValue(sb.toString());
+ }
+
+ /*
+ * What're we doing here messing around with lambdas for the comparator logic?
+ * We want the compare(...) function to run as fast as possible because it will be called many many
+ * times over the lifetime of this object. For that reason we want to limit the number of comparisons
+ * taking place in the compare(...) function. Because this class supports both ascending and
+ * descending comparisons and the logic for each is slightly different, we want to do the
+ * if(ascending){ compare like this } else { compare like this }
+ * check only once - we can do that in the constructor of this class, create a lambda, and then execute
+ * that lambda in the compare function. A little bit of branch prediction savings right here.
+ */
+ private void assignComparator(){
+ if(ComparatorOrder.DESCENDING == order){
+ comparator = new ComparatorLambda() {
+ @Override
+ public int compare(Tuple leftTuple, Tuple rightTuple) {
+ Comparable leftComp = (Comparable)leftTuple.get(leftField);
+ Comparable rightComp = (Comparable)rightTuple.get(rightField);
+
+ if(leftComp == rightComp){ return 0; } // if both null then they are equal. if both are same ref then are equal
+ if(null == leftComp){ return 1; }
+ if(null == rightComp){ return -1; }
+
+ return rightComp.compareTo(leftComp);
+ }
+ };
+ }
+ else{
+ // See above for black magic reasoning.
+ comparator = new ComparatorLambda() {
+ @Override
+ public int compare(Tuple leftTuple, Tuple rightTuple) {
+ Comparable leftComp = (Comparable)leftTuple.get(leftField);
+ Comparable rightComp = (Comparable)rightTuple.get(rightField);
+
+ if(leftComp == rightComp){ return 0; } // if both null then they are equal. if both are same ref then are equal
+ if(null == leftComp){ return -1; }
+ if(null == rightComp){ return 1; }
+
+ return leftComp.compareTo(rightComp);
+ }
+ };
+ }
+ }
+
+ public int compare(Tuple leftTuple, Tuple rightTuple) {
+ return comparator.compare(leftTuple, rightTuple);
+ }
}
\ No newline at end of file
Modified: lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/io/eq/StreamEqualitor.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/io/eq/StreamEqualitor.java?rev=1692329&r1=1692328&r2=1692329&view=diff
==============================================================================
--- lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/io/eq/StreamEqualitor.java (original)
+++ lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/io/eq/StreamEqualitor.java Wed Jul 22 21:54:55 2015
@@ -18,13 +18,54 @@
package org.apache.solr.client.solrj.io.eq;
import java.io.Serializable;
+import java.util.Comparator;
import org.apache.solr.client.solrj.io.Tuple;
+import org.apache.solr.client.solrj.io.comp.ComparatorOrder;
import org.apache.solr.client.solrj.io.comp.StreamComparator;
import org.apache.solr.client.solrj.io.stream.expr.Expressible;
+import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionParameter;
+import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionValue;
+import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
-/** Defines a comparator we can use with TupleStreams */
-public interface StreamEqualitor extends Equalitor<Tuple>, Expressible, Serializable {
- public boolean isDerivedFrom(StreamEqualitor base);
- public boolean isDerivedFrom(StreamComparator base);
+/**
+ * An equality field Equalitor which compares a field of two Tuples and determines if they are equal.
+ **/
+public class StreamEqualitor implements Equalitor<Tuple>, Expressible, Serializable {
+
+ private static final long serialVersionUID = 1;
+
+ private String leftFieldName;
+ private String rightFieldName;
+ private StreamComparator comparator;
+
+ public StreamEqualitor(String fieldName) {
+ init(fieldName, fieldName);
+ }
+ public StreamEqualitor(String leftFieldName, String rightFieldName){
+ init(leftFieldName, rightFieldName);
+ }
+
+ private void init(String leftFieldName, String rightFieldName){
+ this.leftFieldName = leftFieldName;
+ this.rightFieldName = rightFieldName;
+ this.comparator = new StreamComparator(leftFieldName, rightFieldName, ComparatorOrder.ASCENDING);
+ }
+
+ public StreamExpressionParameter toExpression(StreamFactory factory){
+ StringBuilder sb = new StringBuilder();
+
+ sb.append(leftFieldName);
+
+ if(!leftFieldName.equals(rightFieldName)){
+ sb.append("=");
+ sb.append(rightFieldName);
+ }
+
+ return new StreamExpressionValue(sb.toString());
+ }
+
+ public boolean test(Tuple leftTuple, Tuple rightTuple) {
+ return 0 == comparator.compare(leftTuple, rightTuple);
+ }
}
\ No newline at end of file
Modified: lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/CloudSolrStream.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/CloudSolrStream.java?rev=1692329&r1=1692328&r2=1692329&view=diff
==============================================================================
--- lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/CloudSolrStream.java (original)
+++ lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/CloudSolrStream.java Wed Jul 22 21:54:55 2015
@@ -38,8 +38,7 @@ import org.apache.solr.client.solrj.impl
import org.apache.solr.client.solrj.io.SolrClientCache;
import org.apache.solr.client.solrj.io.Tuple;
import org.apache.solr.client.solrj.io.comp.ComparatorOrder;
-import org.apache.solr.client.solrj.io.comp.MultipleFieldComparator;
-import org.apache.solr.client.solrj.io.comp.FieldComparator;
+import org.apache.solr.client.solrj.io.comp.MultiComp;
import org.apache.solr.client.solrj.io.comp.StreamComparator;
import org.apache.solr.client.solrj.io.stream.expr.Expressible;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpression;
@@ -70,7 +69,7 @@ public class CloudSolrStream extends Tup
protected String collection;
protected Map<String,String> params;
private Map<String, String> fieldMappings;
- protected StreamComparator comp;
+ protected Comparator<Tuple> comp;
private int zkConnectTimeout = 10000;
private int zkClientTimeout = 10000;
private int numWorkers;
@@ -243,7 +242,7 @@ public class CloudSolrStream extends Tup
return solrStreams;
}
- private StreamComparator parseComp(String sort, String fl) throws IOException {
+ private Comparator<Tuple> parseComp(String sort, String fl) throws IOException {
String[] fls = fl.split(",");
HashSet fieldSet = new HashSet();
@@ -252,7 +251,7 @@ public class CloudSolrStream extends Tup
}
String[] sorts = sort.split(",");
- StreamComparator[] comps = new StreamComparator[sorts.length];
+ Comparator[] comps = new Comparator[sorts.length];
for(int i=0; i<sorts.length; i++) {
String s = sorts[i];
@@ -270,11 +269,11 @@ public class CloudSolrStream extends Tup
fieldName = fieldMappings.get(fieldName);
}
- comps[i] = new FieldComparator(fieldName, order.equalsIgnoreCase("asc") ? ComparatorOrder.ASCENDING : ComparatorOrder.DESCENDING);
+ comps[i] = new StreamComparator(fieldName, order.equalsIgnoreCase("asc") ? ComparatorOrder.ASCENDING : ComparatorOrder.DESCENDING);
}
if(comps.length > 1) {
- return new MultipleFieldComparator(comps);
+ return new MultiComp(comps);
} else {
return comps[0];
}
@@ -352,11 +351,6 @@ public class CloudSolrStream extends Tup
cloudSolrClient.close();
}
}
-
- /** Return the stream sort - ie, the order in which records are returned */
- public StreamComparator getStreamSort(){
- return comp;
- }
public Tuple read() throws IOException {
return _read();
Modified: lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/MergeStream.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/MergeStream.java?rev=1692329&r1=1692328&r2=1692329&view=diff
==============================================================================
--- lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/MergeStream.java (original)
+++ lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/MergeStream.java Wed Jul 22 21:54:55 2015
@@ -19,11 +19,11 @@ package org.apache.solr.client.solrj.io.
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Comparator;
import java.util.List;
import java.util.Locale;
import org.apache.solr.client.solrj.io.Tuple;
-import org.apache.solr.client.solrj.io.comp.FieldComparator;
import org.apache.solr.client.solrj.io.comp.StreamComparator;
import org.apache.solr.client.solrj.io.stream.expr.Expressible;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpression;
@@ -43,10 +43,12 @@ public class MergeStream extends TupleSt
private PushBackStream streamA;
private PushBackStream streamB;
- private StreamComparator comp;
+ private Comparator<Tuple> comp;
- public MergeStream(TupleStream streamA, TupleStream streamB, StreamComparator comp) throws IOException {
- init(streamA, streamB, comp);
+ public MergeStream(TupleStream streamA, TupleStream streamB, Comparator<Tuple> comp) {
+ this.streamA = new PushBackStream(streamA);
+ this.streamB = new PushBackStream(streamB);
+ this.comp = comp;
}
public MergeStream(StreamExpression expression,StreamFactory factory) throws IOException {
@@ -62,26 +64,15 @@ public class MergeStream extends TupleSt
if(2 != streamExpressions.size()){
throw new IOException(String.format(Locale.ROOT,"Invalid expression %s - expecting two streams but found %d (must be PushBackStream types)",expression, streamExpressions.size()));
}
-
+ this.streamA = new PushBackStream(factory.constructStream(streamExpressions.get(0)));
+ this.streamB = new PushBackStream(factory.constructStream(streamExpressions.get(1)));
+
if(null == onExpression || !(onExpression.getParameter() instanceof StreamExpressionValue)){
throw new IOException(String.format(Locale.ROOT,"Invalid expression %s - expecting single 'on' parameter listing fields to merge on but didn't find one",expression));
}
- init( factory.constructStream(streamExpressions.get(0)),
- factory.constructStream(streamExpressions.get(1)),
- factory.constructComparator(((StreamExpressionValue)onExpression.getParameter()).getValue(), FieldComparator.class)
- );
- }
-
- private void init(TupleStream streamA, TupleStream streamB, StreamComparator comp) throws IOException {
- this.streamA = new PushBackStream(streamA);
- this.streamB = new PushBackStream(streamB);
- this.comp = comp;
-
- // streamA and streamB must both be sorted so that comp can be derived from
- if(!comp.isDerivedFrom(streamA.getStreamSort()) || !comp.isDerivedFrom(streamB.getStreamSort())){
- throw new IOException("Invalid MergeStream - both substream comparators (sort) must be a superset of this stream's comparator.");
- }
+ // Merge is always done over equality, so always use an EqualTo comparator
+ this.comp = factory.constructComparator(((StreamExpressionValue)onExpression.getParameter()).getValue(), StreamComparator.class);
}
@Override
@@ -94,7 +85,12 @@ public class MergeStream extends TupleSt
expression.addParameter(streamB.toExpression(factory));
// on
- expression.addParameter(new StreamExpressionNamedParameter("on",comp.toExpression(factory)));
+ if(comp instanceof Expressible){
+ expression.addParameter(new StreamExpressionNamedParameter("on",((Expressible)comp).toExpression(factory)));
+ }
+ else{
+ throw new IOException("This MergeStream contains a non-expressible comparator - it cannot be converted to an expression");
+ }
return expression;
}
@@ -149,12 +145,6 @@ public class MergeStream extends TupleSt
return b;
}
}
-
- /** Return the stream sort - ie, the order in which records are returned */
- public StreamComparator getStreamSort(){
- return comp;
- }
-
public int getCost() {
return 0;
Modified: lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ParallelStream.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ParallelStream.java?rev=1692329&r1=1692328&r2=1692329&view=diff
==============================================================================
--- lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ParallelStream.java (original)
+++ lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ParallelStream.java Wed Jul 22 21:54:55 2015
@@ -24,6 +24,7 @@ import java.net.URLEncoder;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
+import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
@@ -33,7 +34,6 @@ import java.util.Map.Entry;
import java.util.Random;
import org.apache.solr.client.solrj.io.Tuple;
-import org.apache.solr.client.solrj.io.comp.FieldComparator;
import org.apache.solr.client.solrj.io.comp.StreamComparator;
import org.apache.solr.client.solrj.io.stream.expr.Expressible;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpression;
@@ -65,7 +65,7 @@ public class ParallelStream extends Clou
String collection,
TupleStream tupleStream,
int workers,
- StreamComparator comp) throws IOException {
+ Comparator<Tuple> comp) throws IOException {
init(zkHost,collection,tupleStream,workers,comp);
}
@@ -74,7 +74,7 @@ public class ParallelStream extends Clou
String collection,
String expressionString,
int workers,
- StreamComparator comp) throws IOException {
+ Comparator<Tuple> comp) throws IOException {
objectSerialize = false;
TupleStream tStream = this.streamFactory.constructStream(expressionString);
init(zkHost,collection, tStream, workers,comp);
@@ -140,12 +140,12 @@ public class ParallelStream extends Clou
// We've got all the required items
TupleStream stream = factory.constructStream(streamExpressions.get(0));
- StreamComparator comp = factory.constructComparator(((StreamExpressionValue)sortExpression.getParameter()).getValue(), FieldComparator.class);
+ Comparator<Tuple> comp = factory.constructComparator(((StreamExpressionValue)sortExpression.getParameter()).getValue(), StreamComparator.class);
streamFactory = factory;
init(zkHost,collectionName,stream,workersInt,comp);
}
- private void init(String zkHost,String collection,TupleStream tupleStream,int workers,StreamComparator comp) throws IOException{
+ private void init(String zkHost,String collection,TupleStream tupleStream,int workers,Comparator<Tuple> comp) throws IOException{
this.zkHost = zkHost;
this.collection = collection;
this.workers = workers;
@@ -179,7 +179,12 @@ public class ParallelStream extends Clou
}
// sort
- expression.addParameter(new StreamExpressionNamedParameter("sort",comp.toExpression(factory)));
+ if(comp instanceof Expressible){
+ expression.addParameter(new StreamExpressionNamedParameter("sort",((Expressible)comp).toExpression(factory)));
+ }
+ else{
+ throw new IOException("This ParallelStream contains a non-expressible comparator - it cannot be converted to an expression");
+ }
// zkHost
expression.addParameter(new StreamExpressionNamedParameter("zkHost", zkHost));
Modified: lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/PushBackStream.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/PushBackStream.java?rev=1692329&r1=1692328&r2=1692329&view=diff
==============================================================================
--- lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/PushBackStream.java (original)
+++ lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/PushBackStream.java Wed Jul 22 21:54:55 2015
@@ -22,7 +22,6 @@ import java.util.ArrayList;
import java.util.List;
import org.apache.solr.client.solrj.io.Tuple;
-import org.apache.solr.client.solrj.io.comp.StreamComparator;
import org.apache.solr.client.solrj.io.stream.expr.Expressible;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionParameter;
import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
@@ -83,13 +82,6 @@ public class PushBackStream extends Tupl
return stream.read();
}
}
-
- /** Return the stream sort - ie, the order in which records are returned
- * This returns the streamSort of the substream */
- public StreamComparator getStreamSort(){
- return stream.getStreamSort();
- }
-
public int getCost() {
return 0;
Modified: lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/RankStream.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/RankStream.java?rev=1692329&r1=1692328&r2=1692329&view=diff
==============================================================================
--- lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/RankStream.java (original)
+++ lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/RankStream.java Wed Jul 22 21:54:55 2015
@@ -27,7 +27,6 @@ import java.util.Locale;
import java.util.PriorityQueue;
import org.apache.solr.client.solrj.io.Tuple;
-import org.apache.solr.client.solrj.io.comp.FieldComparator;
import org.apache.solr.client.solrj.io.comp.StreamComparator;
import org.apache.solr.client.solrj.io.stream.expr.Expressible;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpression;
@@ -44,14 +43,14 @@ public class RankStream extends TupleStr
private static final long serialVersionUID = 1;
- private TupleStream stream;
- private StreamComparator comp;
+ private TupleStream tupleStream;
+ private Comparator<Tuple> comp;
private int size;
private transient PriorityQueue<Tuple> top;
private transient boolean finished = false;
private transient LinkedList<Tuple> topList;
- public RankStream(TupleStream tupleStream, int size, StreamComparator comp) throws IOException {
+ public RankStream(TupleStream tupleStream, int size, Comparator<Tuple> comp) {
init(tupleStream,size,comp);
}
@@ -88,17 +87,15 @@ public class RankStream extends TupleStr
}
TupleStream stream = factory.constructStream(streamExpressions.get(0));
- StreamComparator comp = factory.constructComparator(((StreamExpressionValue)sortExpression.getParameter()).getValue(), FieldComparator.class);
+ Comparator<Tuple> comp = factory.constructComparator(((StreamExpressionValue)sortExpression.getParameter()).getValue(), StreamComparator.class);
init(stream,nInt,comp);
}
- private void init(TupleStream tupleStream, int size, StreamComparator comp) throws IOException{
- this.stream = tupleStream;
+ private void init(TupleStream tupleStream, int size, Comparator<Tuple> comp){
+ this.tupleStream = tupleStream;
this.comp = comp;
this.size = size;
-
- // Rank stream does not demand that its order is derivable from the order of the incoming stream. No derivation check required
}
@Override
@@ -110,47 +107,52 @@ public class RankStream extends TupleStr
expression.addParameter(new StreamExpressionNamedParameter("n", Integer.toString(size)));
// stream
- if(stream instanceof Expressible){
- expression.addParameter(((Expressible)stream).toExpression(factory));
+ if(tupleStream instanceof Expressible){
+ expression.addParameter(((Expressible)tupleStream).toExpression(factory));
}
else{
throw new IOException("This RankStream contains a non-expressible TupleStream - it cannot be converted to an expression");
}
// sort
- expression.addParameter(new StreamExpressionNamedParameter("sort",comp.toExpression(factory)));
+ if(comp instanceof Expressible){
+ expression.addParameter(new StreamExpressionNamedParameter("sort",((Expressible)comp).toExpression(factory)));
+ }
+ else{
+ throw new IOException("This RankStream contains a non-expressible comparator - it cannot be converted to an expression");
+ }
return expression;
}
public void setStreamContext(StreamContext context) {
- this.stream.setStreamContext(context);
+ this.tupleStream.setStreamContext(context);
}
public List<TupleStream> children() {
List<TupleStream> l = new ArrayList();
- l.add(stream);
+ l.add(tupleStream);
return l;
}
public void open() throws IOException {
this.top = new PriorityQueue(size, new ReverseComp(comp));
this.topList = new LinkedList();
- stream.open();
+ tupleStream.open();
}
public void close() throws IOException {
- stream.close();
+ tupleStream.close();
}
- public StreamComparator getComparator(){
+ public Comparator<Tuple> getComparator(){
return this.comp;
}
public Tuple read() throws IOException {
if(!finished) {
while(true) {
- Tuple tuple = stream.read();
+ Tuple tuple = tupleStream.read();
if(tuple.EOF) {
finished = true;
int s = top.size();
@@ -176,11 +178,6 @@ public class RankStream extends TupleStr
return topList.pollFirst();
}
-
- /** Return the stream sort - ie, the order in which records are returned */
- public StreamComparator getStreamSort(){
- return comp;
- }
public int getCost() {
return 0;
@@ -188,16 +185,14 @@ public class RankStream extends TupleStr
class ReverseComp implements Comparator<Tuple>, Serializable {
- private StreamComparator comp;
+ private Comparator<Tuple> comp;
- public ReverseComp(StreamComparator comp) {
+ public ReverseComp(Comparator<Tuple> comp) {
this.comp = comp;
}
public int compare(Tuple t1, Tuple t2) {
return comp.compare(t1, t2)*(-1);
}
-
-
}
}
\ No newline at end of file
Modified: lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ReducerStream.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ReducerStream.java?rev=1692329&r1=1692328&r2=1692329&view=diff
==============================================================================
--- lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ReducerStream.java (original)
+++ lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ReducerStream.java Wed Jul 22 21:54:55 2015
@@ -19,13 +19,13 @@ package org.apache.solr.client.solrj.io.
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import org.apache.solr.client.solrj.io.Tuple;
-import org.apache.solr.client.solrj.io.comp.FieldComparator;
import org.apache.solr.client.solrj.io.comp.StreamComparator;
import org.apache.solr.client.solrj.io.stream.expr.Expressible;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpression;
@@ -52,13 +52,15 @@ public class ReducerStream extends Tuple
private static final long serialVersionUID = 1;
- private PushBackStream stream;
- private StreamComparator comp;
+ private PushBackStream tupleStream;
+ private Comparator<Tuple> comp;
private transient Tuple currentGroupHead;
- public ReducerStream(TupleStream stream,StreamComparator comp) throws IOException {
- init(stream,comp);
+ public ReducerStream(TupleStream tupleStream,
+ Comparator<Tuple> comp) {
+ this.tupleStream = new PushBackStream(tupleStream);
+ this.comp = comp;
}
public ReducerStream(StreamExpression expression, StreamFactory factory) throws IOException{
@@ -73,25 +75,15 @@ public class ReducerStream extends Tuple
if(1 != streamExpressions.size()){
throw new IOException(String.format(Locale.ROOT,"Invalid expression %s - expecting a single stream but found %d",expression, streamExpressions.size()));
- }
+ }
+ this.tupleStream = new PushBackStream(factory.constructStream(streamExpressions.get(0)));
+
if(null == byExpression || !(byExpression.getParameter() instanceof StreamExpressionValue)){
throw new IOException(String.format(Locale.ROOT,"Invalid expression %s - expecting single 'by' parameter listing fields to group by but didn't find one",expression));
}
// Reducing is always done over equality, so always use an EqualTo comparator
-
- init(factory.constructStream(streamExpressions.get(0)),
- factory.constructComparator(((StreamExpressionValue)byExpression.getParameter()).getValue(), FieldComparator.class)
- );
- }
-
- private void init(TupleStream stream, StreamComparator comp) throws IOException{
- this.stream = new PushBackStream(stream);
- this.comp = comp;
-
- if(!comp.isDerivedFrom(stream.getStreamSort())){
- throw new IOException("Invalid ReducerStream - substream comparator (sort) must be a superset of this stream's comparator.");
- }
+ this.comp = factory.constructComparator(((StreamExpressionValue)byExpression.getParameter()).getValue(), StreamComparator.class);
}
@Override
@@ -100,7 +92,7 @@ public class ReducerStream extends Tuple
StreamExpression expression = new StreamExpression(factory.getFunctionName(this.getClass()));
// stream
- expression.addParameter(stream.toExpression(factory));
+ expression.addParameter(tupleStream.toExpression(factory));
// over
if(comp instanceof Expressible){
@@ -114,32 +106,32 @@ public class ReducerStream extends Tuple
}
public void setStreamContext(StreamContext context) {
- this.stream.setStreamContext(context);
+ this.tupleStream.setStreamContext(context);
}
public List<TupleStream> children() {
List<TupleStream> l = new ArrayList();
- l.add(stream);
+ l.add(tupleStream);
return l;
}
public void open() throws IOException {
- stream.open();
+ tupleStream.open();
}
public void close() throws IOException {
- stream.close();
+ tupleStream.close();
}
public Tuple read() throws IOException {
List<Map> maps = new ArrayList();
while(true) {
- Tuple t = stream.read();
+ Tuple t = tupleStream.read();
if(t.EOF) {
if(maps.size() > 0) {
- stream.pushBack(t);
+ tupleStream.pushBack(t);
Map map1 = maps.get(0);
Map map2 = new HashMap();
map2.putAll(map1);
@@ -159,7 +151,7 @@ public class ReducerStream extends Tuple
maps.add(t.getMap());
} else {
Tuple groupHead = currentGroupHead.clone();
- stream.pushBack(t);
+ tupleStream.pushBack(t);
currentGroupHead = null;
groupHead.setMaps(maps);
return groupHead;
@@ -167,11 +159,6 @@ public class ReducerStream extends Tuple
}
}
}
-
- /** Return the stream sort - ie, the order in which records are returned */
- public StreamComparator getStreamSort(){
- return comp;
- }
public int getCost() {
return 0;
Modified: lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/RollupStream.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/RollupStream.java?rev=1692329&r1=1692328&r2=1692329&view=diff
==============================================================================
--- lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/RollupStream.java (original)
+++ lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/RollupStream.java Wed Jul 22 21:54:55 2015
@@ -26,7 +26,6 @@ import java.util.ArrayList;
import org.apache.solr.client.solrj.io.Tuple;
import org.apache.solr.client.solrj.io.comp.HashKey;
-import org.apache.solr.client.solrj.io.comp.StreamComparator;
import org.apache.solr.client.solrj.io.stream.metrics.Bucket;
import org.apache.solr.client.solrj.io.stream.metrics.Metric;
@@ -67,10 +66,6 @@ public class RollupStream extends TupleS
tupleStream.close();
}
- public StreamComparator getStreamSort(){
- return tupleStream.getStreamSort();
- }
-
public Tuple read() throws IOException {
while(true) {
Modified: lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/SolrStream.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/SolrStream.java?rev=1692329&r1=1692328&r2=1692329&view=diff
==============================================================================
--- lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/SolrStream.java (original)
+++ lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/SolrStream.java Wed Jul 22 21:54:55 2015
@@ -27,7 +27,6 @@ import java.util.Iterator;
import org.apache.solr.client.solrj.impl.HttpSolrClient;
import org.apache.solr.client.solrj.io.SolrClientCache;
import org.apache.solr.client.solrj.io.Tuple;
-import org.apache.solr.client.solrj.io.comp.StreamComparator;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.params.SolrParams;
@@ -164,11 +163,6 @@ public class SolrStream extends TupleStr
return new Tuple(fields);
}
}
-
- /** There is no known sort applied to a SolrStream */
- public StreamComparator getStreamSort(){
- return null;
- }
private Map mapFields(Map fields, Map<String,String> mappings) {
Modified: lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/TupleStream.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/TupleStream.java?rev=1692329&r1=1692328&r2=1692329&view=diff
==============================================================================
--- lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/TupleStream.java (original)
+++ lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/TupleStream.java Wed Jul 22 21:54:55 2015
@@ -22,7 +22,6 @@ import java.io.Serializable;
import java.util.List;
import org.apache.solr.client.solrj.io.Tuple;
-import org.apache.solr.client.solrj.io.comp.StreamComparator;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpression;
import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
@@ -44,10 +43,7 @@ public abstract class TupleStream implem
public abstract Tuple read() throws IOException;
- public abstract StreamComparator getStreamSort();
-
public int getCost() {
return 0;
}
-
}
\ No newline at end of file
Modified: lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/UniqueStream.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/UniqueStream.java?rev=1692329&r1=1692328&r2=1692329&view=diff
==============================================================================
--- lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/UniqueStream.java (original)
+++ lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/UniqueStream.java Wed Jul 22 21:54:55 2015
@@ -24,10 +24,8 @@ import java.util.List;
import java.util.Locale;
import org.apache.solr.client.solrj.io.Tuple;
-import org.apache.solr.client.solrj.io.comp.FieldComparator;
import org.apache.solr.client.solrj.io.comp.StreamComparator;
import org.apache.solr.client.solrj.io.eq.Equalitor;
-import org.apache.solr.client.solrj.io.eq.FieldEqualitor;
import org.apache.solr.client.solrj.io.eq.StreamEqualitor;
import org.apache.solr.client.solrj.io.stream.expr.Expressible;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpression;
@@ -46,12 +44,13 @@ public class UniqueStream extends TupleS
private static final long serialVersionUID = 1;
- private TupleStream stream;
+ private TupleStream tupleStream;
private Equalitor<Tuple> eq;
private transient Tuple currentTuple;
- public UniqueStream(TupleStream stream, StreamEqualitor eq) throws IOException {
- init(stream,eq);
+ public UniqueStream(TupleStream tupleStream, Equalitor<Tuple> eq) {
+ this.tupleStream = tupleStream;
+ this.eq = eq;
}
public UniqueStream(StreamExpression expression,StreamFactory factory) throws IOException {
@@ -67,21 +66,14 @@ public class UniqueStream extends TupleS
if(1 != streamExpressions.size()){
throw new IOException(String.format(Locale.ROOT,"Invalid expression %s - expecting a single stream but found %d",expression, streamExpressions.size()));
}
+ this.tupleStream = factory.constructStream(streamExpressions.get(0));
if(null == overExpression || !(overExpression.getParameter() instanceof StreamExpressionValue)){
throw new IOException(String.format(Locale.ROOT,"Invalid expression %s - expecting single 'over' parameter listing fields to unique over but didn't find one",expression));
}
- init(factory.constructStream(streamExpressions.get(0)), factory.constructEqualitor(((StreamExpressionValue)overExpression.getParameter()).getValue(), FieldEqualitor.class));
- }
-
- private void init(TupleStream stream, StreamEqualitor eq) throws IOException{
- this.stream = stream;
- this.eq = eq;
-
- if(!eq.isDerivedFrom(stream.getStreamSort())){
- throw new IOException("Invalid UniqueStream - substream comparator (sort) must be a superset of this stream's equalitor.");
- }
+ // Uniqueness is always done over equality, so always use an EqualTo comparator
+ this.eq = factory.constructEqualitor(((StreamExpressionValue)overExpression.getParameter()).getValue(), StreamEqualitor.class);
}
@Override
@@ -90,8 +82,8 @@ public class UniqueStream extends TupleS
StreamExpression expression = new StreamExpression(factory.getFunctionName(this.getClass()));
// streams
- if(stream instanceof Expressible){
- expression.addParameter(((Expressible)stream).toExpression(factory));
+ if(tupleStream instanceof Expressible){
+ expression.addParameter(((Expressible)tupleStream).toExpression(factory));
}
else{
throw new IOException("This UniqueStream contains a non-expressible TupleStream - it cannot be converted to an expression");
@@ -109,25 +101,25 @@ public class UniqueStream extends TupleS
}
public void setStreamContext(StreamContext context) {
- this.stream.setStreamContext(context);
+ this.tupleStream.setStreamContext(context);
}
public List<TupleStream> children() {
List<TupleStream> l = new ArrayList<TupleStream>();
- l.add(stream);
+ l.add(tupleStream);
return l;
}
public void open() throws IOException {
- stream.open();
+ tupleStream.open();
}
public void close() throws IOException {
- stream.close();
+ tupleStream.close();
}
public Tuple read() throws IOException {
- Tuple tuple = stream.read();
+ Tuple tuple = tupleStream.read();
if(tuple.EOF) {
return tuple;
}
@@ -139,7 +131,7 @@ public class UniqueStream extends TupleS
while(true) {
if(eq.test(currentTuple, tuple)){
//We have duplicate tuple so read the next tuple from the stream.
- tuple = stream.read();
+ tuple = tupleStream.read();
if(tuple.EOF) {
return tuple;
}
@@ -152,11 +144,6 @@ public class UniqueStream extends TupleS
}
}
- /** Return the stream sort - ie, the order in which records are returned */
- public StreamComparator getStreamSort(){
- return stream.getStreamSort();
- }
-
public int getCost() {
return 0;
}
Modified: lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/expr/StreamFactory.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/expr/StreamFactory.java?rev=1692329&r1=1692328&r2=1692329&view=diff
==============================================================================
--- lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/expr/StreamFactory.java (original)
+++ lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/expr/StreamFactory.java Wed Jul 22 21:54:55 2015
@@ -5,6 +5,7 @@ import java.io.Serializable;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;
+import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
@@ -13,11 +14,9 @@ import java.util.Map.Entry;
import org.apache.solr.client.solrj.io.Tuple;
import org.apache.solr.client.solrj.io.comp.ComparatorOrder;
-import org.apache.solr.client.solrj.io.comp.MultipleFieldComparator;
-import org.apache.solr.client.solrj.io.comp.StreamComparator;
+import org.apache.solr.client.solrj.io.comp.MultiComp;
import org.apache.solr.client.solrj.io.eq.Equalitor;
-import org.apache.solr.client.solrj.io.eq.MultipleFieldEqualitor;
-import org.apache.solr.client.solrj.io.eq.StreamEqualitor;
+import org.apache.solr.client.solrj.io.eq.MultiEqualitor;
import org.apache.solr.client.solrj.io.stream.TupleStream;
/*
@@ -180,14 +179,14 @@ public class StreamFactory implements Se
throw new IOException(String.format(Locale.ROOT,"Invalid stream expression %s - function '%s' is unknown (not mapped to a valid TupleStream)", expression, expression.getFunctionName()));
}
- public StreamComparator constructComparator(String comparatorString, Class comparatorType) throws IOException {
+ public Comparator<Tuple> constructComparator(String comparatorString, Class comparatorType) throws IOException {
if(comparatorString.contains(",")){
String[] parts = comparatorString.split(",");
- StreamComparator[] comps = new StreamComparator[parts.length];
+ Comparator[] comps = new Comparator[parts.length];
for(int idx = 0; idx < parts.length; ++idx){
comps[idx] = constructComparator(parts[idx].trim(), comparatorType);
}
- return new MultipleFieldComparator(comps);
+ return new MultiComp(comps);
}
else{
String[] parts = comparatorString.split(" ");
@@ -198,18 +197,18 @@ public class StreamFactory implements Se
String fieldName = parts[0].trim();
String order = parts[1].trim();
- return (StreamComparator)createInstance(comparatorType, new Class[]{ String.class, ComparatorOrder.class }, new Object[]{ fieldName, ComparatorOrder.fromString(order) });
+ return (Comparator)createInstance(comparatorType, new Class[]{ String.class, ComparatorOrder.class }, new Object[]{ fieldName, ComparatorOrder.fromString(order) });
}
}
- public StreamEqualitor constructEqualitor(String equalitorString, Class equalitorType) throws IOException {
+ public Equalitor<Tuple> constructEqualitor(String equalitorString, Class equalitorType) throws IOException {
if(equalitorString.contains(",")){
String[] parts = equalitorString.split(",");
- StreamEqualitor[] eqs = new StreamEqualitor[parts.length];
+ Equalitor[] eqs = new Equalitor[parts.length];
for(int idx = 0; idx < parts.length; ++idx){
eqs[idx] = constructEqualitor(parts[idx].trim(), equalitorType);
}
- return new MultipleFieldEqualitor(eqs);
+ return new MultiEqualitor(eqs);
}
else{
String leftFieldName;
@@ -228,7 +227,7 @@ public class StreamFactory implements Se
leftFieldName = rightFieldName = equalitorString.trim();
}
- return (StreamEqualitor)createInstance(equalitorType, new Class[]{ String.class, String.class }, new Object[]{ leftFieldName, rightFieldName });
+ return (Equalitor)createInstance(equalitorType, new Class[]{ String.class, String.class }, new Object[]{ leftFieldName, rightFieldName });
}
}
Modified: lucene/dev/branches/branch_5x/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/CountStream.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_5x/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/CountStream.java?rev=1692329&r1=1692328&r2=1692329&view=diff
==============================================================================
--- lucene/dev/branches/branch_5x/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/CountStream.java (original)
+++ lucene/dev/branches/branch_5x/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/CountStream.java Wed Jul 22 21:54:55 2015
@@ -23,7 +23,6 @@ import java.util.List;
import java.util.Locale;
import org.apache.solr.client.solrj.io.Tuple;
-import org.apache.solr.client.solrj.io.comp.StreamComparator;
import org.apache.solr.client.solrj.io.stream.expr.Expressible;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpression;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionParameter;
@@ -97,9 +96,4 @@ public class CountStream extends TupleSt
return t;
}
}
-
- @Override
- public StreamComparator getStreamSort() {
- return null;
- }
}
\ No newline at end of file
Modified: lucene/dev/branches/branch_5x/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_5x/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java?rev=1692329&r1=1692328&r2=1692329&view=diff
==============================================================================
--- lucene/dev/branches/branch_5x/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java (original)
+++ lucene/dev/branches/branch_5x/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java Wed Jul 22 21:54:55 2015
@@ -601,12 +601,7 @@ public class StreamExpressionTest extend
.withStreamFunction("group", ReducerStream.class)
.withStreamFunction("parallel", ParallelStream.class);
- ParallelStream pstream = (ParallelStream)streamFactory.constructStream("parallel("
- + "collection1, "
- + "top("
- + "search(collection1, q=\"*:*\", fl=\"id,a_s,a_i\", sort=\"a_i asc\", partitionKeys=\"a_i\"), "
- + "n=\"11\", "
- + "sort=\"a_i desc\"), workers=\"2\", zkHost=\""+zkHost+"\", sort=\"a_i desc\")");
+ ParallelStream pstream = (ParallelStream)streamFactory.constructStream("parallel(collection1, top(search(collection1, q=\"*:*\", fl=\"id,a_s,a_i\", sort=\"a_i asc\", partitionKeys=\"a_i\"), n=\"11\", sort=\"a_i desc\"), workers=\"2\", zkHost=\""+zkHost+"\", sort=\"a_i desc\")");
List<Tuple> tuples = getTuples(pstream);
Modified: lucene/dev/branches/branch_5x/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionToExpessionTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_5x/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionToExpessionTest.java?rev=1692329&r1=1692328&r2=1692329&view=diff
==============================================================================
--- lucene/dev/branches/branch_5x/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionToExpessionTest.java (original)
+++ lucene/dev/branches/branch_5x/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionToExpessionTest.java Wed Jul 22 21:54:55 2015
@@ -78,10 +78,10 @@ public class StreamExpressionToExpession
String expressionString;
// Basic test
- stream = new UniqueStream(StreamExpressionParser.parse("unique(search(collection1, q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_i asc\"), over=\"a_f\")"), factory);
+ stream = new UniqueStream(StreamExpressionParser.parse("unique(search(collection1, q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_i asc\"), over=\"a_f asc\")"), factory);
expressionString = stream.toExpression(factory).toString();
assertTrue(expressionString.contains("unique(search(collection1"));
- assertTrue(expressionString.contains("over=a_f"));
+ assertTrue(expressionString.contains("over=\"a_f asc\""));
}
@Test
Modified: lucene/dev/branches/branch_5x/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamingTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_5x/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamingTest.java?rev=1692329&r1=1692328&r2=1692329&view=diff
==============================================================================
--- lucene/dev/branches/branch_5x/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamingTest.java (original)
+++ lucene/dev/branches/branch_5x/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamingTest.java Wed Jul 22 21:54:55 2015
@@ -29,9 +29,9 @@ import org.apache.lucene.util.LuceneTest
import org.apache.lucene.util.LuceneTestCase.Slow;
import org.apache.solr.client.solrj.io.Tuple;
import org.apache.solr.client.solrj.io.comp.ComparatorOrder;
-import org.apache.solr.client.solrj.io.comp.MultipleFieldComparator;
-import org.apache.solr.client.solrj.io.comp.FieldComparator;
-import org.apache.solr.client.solrj.io.eq.FieldEqualitor;
+import org.apache.solr.client.solrj.io.comp.MultiComp;
+import org.apache.solr.client.solrj.io.comp.StreamComparator;
+import org.apache.solr.client.solrj.io.eq.StreamEqualitor;
import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
import org.apache.solr.client.solrj.io.stream.metrics.Bucket;
import org.apache.solr.client.solrj.io.stream.metrics.CountMetric;
@@ -139,23 +139,11 @@ public class StreamingTest extends Abstr
Map params = mapParams("q","*:*","fl","id,a_s,a_i,a_f","sort", "a_f asc,a_i asc");
CloudSolrStream stream = new CloudSolrStream(zkHost, "collection1", params);
- UniqueStream ustream = new UniqueStream(stream, new FieldEqualitor("a_f"));
+ UniqueStream ustream = new UniqueStream(stream, new StreamEqualitor("a_f"));
List<Tuple> tuples = getTuples(ustream);
assert(tuples.size() == 4);
assertOrder(tuples, 0,1,3,4);
-
- try {
- params = mapParams("q","*:*","fl","id,a_s,a_i,a_f","sort", "a_f asc,a_i asc");
- stream = new CloudSolrStream(zkHost, "collection1", params);
- ustream = new UniqueStream(stream, new FieldEqualitor("a_i"));
- throw new Exception("Equalitors did not match but no excepion was thrown");
- } catch(Exception e) {
- if(!e.getMessage().equals("Invalid UniqueStream - substream comparator (sort) must be a superset of this stream's equalitor.")) {
- throw e;
- }
- }
-
del("*:*");
commit();
@@ -200,7 +188,7 @@ public class StreamingTest extends Abstr
Map paramsA = mapParams("q", "*:*", "fl", "id,a_s,a_i,a_f", "sort", "a_s asc,a_f asc", "partitionKeys", "none");
CloudSolrStream stream = new CloudSolrStream(zkHost, "collection1", paramsA);
- ParallelStream pstream = new ParallelStream(zkHost, "collection1", stream, 2, new FieldComparator("a_s",ComparatorOrder.ASCENDING));
+ ParallelStream pstream = new ParallelStream(zkHost, "collection1", stream, 2, new StreamComparator("a_s",ComparatorOrder.ASCENDING));
List<Tuple> tuples = getTuples(pstream);
@@ -233,8 +221,8 @@ public class StreamingTest extends Abstr
Map params = mapParams("q","*:*","fl","id,a_s,a_i,a_f","sort", "a_f asc,a_i asc", "partitionKeys", "a_f");
CloudSolrStream stream = new CloudSolrStream(zkHost, "collection1", params);
- UniqueStream ustream = new UniqueStream(stream, new FieldEqualitor("a_f"));
- ParallelStream pstream = new ParallelStream(zkHost, "collection1", ustream, 2, new FieldComparator("a_f",ComparatorOrder.ASCENDING));
+ UniqueStream ustream = new UniqueStream(stream, new StreamEqualitor("a_f"));
+ ParallelStream pstream = new ParallelStream(zkHost, "collection1", ustream, 2, new StreamComparator("a_f",ComparatorOrder.ASCENDING));
List<Tuple> tuples = getTuples(pstream);
assert(tuples.size() == 5);
assertOrder(tuples, 0,1,3,4,6);
@@ -267,7 +255,7 @@ public class StreamingTest extends Abstr
Map params = mapParams("q", "*:*", "fl", "id,a_s,a_i", "sort", "a_i asc");
CloudSolrStream stream = new CloudSolrStream(zkHost, "collection1", params);
- RankStream rstream = new RankStream(stream, 3, new FieldComparator("a_i",ComparatorOrder.DESCENDING));
+ RankStream rstream = new RankStream(stream, 3, new StreamComparator("a_i",ComparatorOrder.DESCENDING));
List<Tuple> tuples = getTuples(rstream);
@@ -299,8 +287,8 @@ public class StreamingTest extends Abstr
Map params = mapParams("q", "*:*", "fl", "id,a_s,a_i", "sort", "a_i asc", "partitionKeys", "a_i");
CloudSolrStream stream = new CloudSolrStream(zkHost, "collection1", params);
- RankStream rstream = new RankStream(stream, 11, new FieldComparator("a_i",ComparatorOrder.DESCENDING));
- ParallelStream pstream = new ParallelStream(zkHost, "collection1", rstream, 2, new FieldComparator("a_i",ComparatorOrder.DESCENDING));
+ RankStream rstream = new RankStream(stream, 11, new StreamComparator("a_i",ComparatorOrder.DESCENDING));
+ ParallelStream pstream = new ParallelStream(zkHost, "collection1", rstream, 2, new StreamComparator("a_i",ComparatorOrder.DESCENDING));
List<Tuple> tuples = getTuples(pstream);
assert(tuples.size() == 10);
@@ -366,7 +354,7 @@ public class StreamingTest extends Abstr
//Test with spaces in the parameter lists.
Map paramsA = mapParams("q","*:*","fl","id,a_s, a_i, a_f","sort", "a_s asc , a_f asc");
CloudSolrStream stream = new CloudSolrStream(zkHost, "collection1", paramsA);
- ReducerStream rstream = new ReducerStream(stream, new FieldComparator("a_s",ComparatorOrder.ASCENDING));
+ ReducerStream rstream = new ReducerStream(stream, new StreamComparator("a_s",ComparatorOrder.ASCENDING));
List<Tuple> tuples = getTuples(rstream);
@@ -385,18 +373,6 @@ public class StreamingTest extends Abstr
List<Map> maps2 = t2.getMaps();
assertMaps(maps2, 4, 6);
- try {
-
- paramsA = mapParams("q","*:*","fl","id,a_s, a_i, a_f","sort", "a_i asc , a_f asc");
- stream = new CloudSolrStream(zkHost, "collection1", paramsA);
- rstream = new ReducerStream(stream, new FieldComparator("a_s",ComparatorOrder.ASCENDING));
- throw new Exception("Sorts did not match up and Exception was not not thrown.");
- } catch (Exception e) {
- if(!e.getMessage().equals("Invalid ReducerStream - substream comparator (sort) must be a superset of this stream's comparator.")) {
- throw e;
- }
- }
-
del("*:*");
@@ -425,7 +401,7 @@ public class StreamingTest extends Abstr
//Test with spaces in the parameter lists.
Map paramsA = mapParams("q", "blah", "fl", "id,a_s, a_i, a_f", "sort", "a_s asc , a_f asc");
CloudSolrStream stream = new CloudSolrStream(zkHost, "collection1", paramsA);
- ReducerStream rstream = new ReducerStream(stream, new FieldComparator("a_s", ComparatorOrder.ASCENDING));
+ ReducerStream rstream = new ReducerStream(stream, new StreamComparator("a_s", ComparatorOrder.ASCENDING));
List<Tuple> tuples = getTuples(rstream);
@@ -456,8 +432,8 @@ public class StreamingTest extends Abstr
Map paramsA = mapParams("q","*:*","fl","id,a_s,a_i,a_f","sort", "a_s asc,a_f asc", "partitionKeys", "a_s");
CloudSolrStream stream = new CloudSolrStream(zkHost, "collection1", paramsA);
- ReducerStream rstream = new ReducerStream(stream, new FieldComparator("a_s",ComparatorOrder.ASCENDING));
- ParallelStream pstream = new ParallelStream(zkHost, "collection1", rstream, 2, new FieldComparator("a_s",ComparatorOrder.ASCENDING));
+ ReducerStream rstream = new ReducerStream(stream, new StreamComparator("a_s",ComparatorOrder.ASCENDING));
+ ParallelStream pstream = new ParallelStream(zkHost, "collection1", rstream, 2, new StreamComparator("a_s",ComparatorOrder.ASCENDING));
List<Tuple> tuples = getTuples(pstream);
@@ -480,8 +456,8 @@ public class StreamingTest extends Abstr
paramsA = mapParams("q","*:*","fl","id,a_s,a_i,a_f","sort", "a_s desc,a_f asc", "partitionKeys", "a_s");
stream = new CloudSolrStream(zkHost, "collection1", paramsA);
- rstream = new ReducerStream(stream, new FieldComparator("a_s",ComparatorOrder.DESCENDING));
- pstream = new ParallelStream(zkHost, "collection1", rstream, 2, new FieldComparator("a_s",ComparatorOrder.DESCENDING));
+ rstream = new ReducerStream(stream, new StreamComparator("a_s",ComparatorOrder.DESCENDING));
+ pstream = new ParallelStream(zkHost, "collection1", rstream, 2, new StreamComparator("a_s",ComparatorOrder.DESCENDING));
tuples = getTuples(pstream);
@@ -663,7 +639,7 @@ public class StreamingTest extends Abstr
new CountMetric()};
RollupStream rollupStream = new RollupStream(stream, buckets, metrics);
- ParallelStream parallelStream = new ParallelStream(zkHost, "collection1", rollupStream, 2, new FieldComparator("a_s", ComparatorOrder.ASCENDING));
+ ParallelStream parallelStream = new ParallelStream(zkHost, "collection1", rollupStream, 2, new StreamComparator("a_s", ComparatorOrder.ASCENDING));
List<Tuple> tuples = getTuples(parallelStream);
assert(tuples.size() == 3);
@@ -763,8 +739,8 @@ public class StreamingTest extends Abstr
Map paramsA = mapParams("q","blah","fl","id,a_s,a_i,a_f","sort", "a_s asc,a_f asc", "partitionKeys", "a_s");
CloudSolrStream stream = new CloudSolrStream(zkHost, "collection1", paramsA);
- ReducerStream rstream = new ReducerStream(stream, new FieldComparator("a_s", ComparatorOrder.ASCENDING));
- ParallelStream pstream = new ParallelStream(zkHost, "collection1", rstream, 2, new FieldComparator("a_s", ComparatorOrder.ASCENDING));
+ ReducerStream rstream = new ReducerStream(stream, new StreamComparator("a_s", ComparatorOrder.ASCENDING));
+ ParallelStream pstream = new ParallelStream(zkHost, "collection1", rstream, 2, new StreamComparator("a_s", ComparatorOrder.ASCENDING));
List<Tuple> tuples = getTuples(pstream);
assert(tuples.size() == 0);
@@ -833,7 +809,7 @@ public class StreamingTest extends Abstr
Map paramsB = mapParams("q","id:(0 2 3)","fl","id,a_s,a_i","sort", "a_i asc");
CloudSolrStream streamB = new CloudSolrStream(zkHost, "collection1", paramsB);
- MergeStream mstream = new MergeStream(streamA, streamB, new FieldComparator("a_i",ComparatorOrder.ASCENDING));
+ MergeStream mstream = new MergeStream(streamA, streamB, new StreamComparator("a_i",ComparatorOrder.ASCENDING));
List<Tuple> tuples = getTuples(mstream);
assert(tuples.size() == 5);
@@ -846,7 +822,7 @@ public class StreamingTest extends Abstr
paramsB = mapParams("q","id:(0 2 3)","fl","id,a_s,a_i","sort", "a_i desc");
streamB = new CloudSolrStream(zkHost, "collection1", paramsB);
- mstream = new MergeStream(streamA, streamB, new FieldComparator("a_i",ComparatorOrder.DESCENDING));
+ mstream = new MergeStream(streamA, streamB, new StreamComparator("a_i",ComparatorOrder.DESCENDING));
tuples = getTuples(mstream);
assert(tuples.size() == 5);
@@ -860,7 +836,7 @@ public class StreamingTest extends Abstr
paramsB = mapParams("q","id:(0 3)","fl","id,a_s,a_i,a_f","sort", "a_f asc,a_i asc");
streamB = new CloudSolrStream(zkHost, "collection1", paramsB);
- mstream = new MergeStream(streamA, streamB, new MultipleFieldComparator(new FieldComparator("a_f",ComparatorOrder.ASCENDING),new FieldComparator("a_i",ComparatorOrder.ASCENDING)));
+ mstream = new MergeStream(streamA, streamB, new MultiComp(new StreamComparator("a_f",ComparatorOrder.ASCENDING),new StreamComparator("a_i",ComparatorOrder.ASCENDING)));
tuples = getTuples(mstream);
assert(tuples.size() == 5);
@@ -872,41 +848,12 @@ public class StreamingTest extends Abstr
paramsB = mapParams("q","id:(0 3)","fl","id,a_s,a_i,a_f","sort", "a_f asc,a_i desc");
streamB = new CloudSolrStream(zkHost, "collection1", paramsB);
- mstream = new MergeStream(streamA, streamB, new MultipleFieldComparator(new FieldComparator("a_f",ComparatorOrder.ASCENDING),new FieldComparator("a_i",ComparatorOrder.DESCENDING)));
+ mstream = new MergeStream(streamA, streamB, new MultiComp(new StreamComparator("a_f",ComparatorOrder.ASCENDING),new StreamComparator("a_i",ComparatorOrder.DESCENDING)));
tuples = getTuples(mstream);
assert(tuples.size() == 5);
assertOrder(tuples, 2,0,1,3,4);
- try {
- paramsA = mapParams("q","id:(2 4 1)","fl","id,a_s,a_i,a_f","sort", "a_f desc,a_i desc");
- streamA = new CloudSolrStream(zkHost, "collection1", paramsA);
-
- paramsB = mapParams("q","id:(0 3)","fl","id,a_s,a_i,a_f","sort", "a_f asc,a_i desc");
- streamB = new CloudSolrStream(zkHost, "collection1", paramsB);
- mstream = new MergeStream(streamA, streamB, new MultipleFieldComparator(new FieldComparator("a_f",ComparatorOrder.ASCENDING),new FieldComparator("a_i",ComparatorOrder.DESCENDING)));
- throw new Exception("Sorts did not match up and Exception was not not thrown.");
- } catch(Exception e) {
- if(!e.getMessage().equals("Invalid MergeStream - both substream comparators (sort) must be a superset of this stream's comparator.")) {
- throw e;
- }
- }
-
- try {
- paramsA = mapParams("q","id:(2 4 1)","fl","id,a_s,a_i,a_f","sort", "a_f asc,a_i desc");
- streamA = new CloudSolrStream(zkHost, "collection1", paramsA);
-
- paramsB = mapParams("q","id:(0 3)","fl","id,a_s,a_i,a_f","sort", "a_f asc,a_i asc");
- streamB = new CloudSolrStream(zkHost, "collection1", paramsB);
- mstream = new MergeStream(streamA, streamB, new MultipleFieldComparator(new FieldComparator("a_f",ComparatorOrder.ASCENDING),new FieldComparator("a_i",ComparatorOrder.DESCENDING)));
- throw new Exception("Sorts did not match up and Exception was not not thrown.");
- } catch(Exception e) {
- if(!e.getMessage().equals("Invalid MergeStream - both substream comparators (sort) must be a superset of this stream's comparator.")) {
- throw e;
- }
- }
-
-
del("*:*");
commit();
}
@@ -937,8 +884,8 @@ public class StreamingTest extends Abstr
Map paramsB = mapParams("q","id:(0 2 3 6)","fl","id,a_s,a_i","sort", "a_i asc", "partitionKeys", "a_i");
CloudSolrStream streamB = new CloudSolrStream(zkHost, "collection1", paramsB);
- MergeStream mstream = new MergeStream(streamA, streamB, new FieldComparator("a_i",ComparatorOrder.ASCENDING));
- ParallelStream pstream = new ParallelStream(zkHost, "collection1", mstream, 2, new FieldComparator("a_i",ComparatorOrder.ASCENDING));
+ MergeStream mstream = new MergeStream(streamA, streamB, new StreamComparator("a_i",ComparatorOrder.ASCENDING));
+ ParallelStream pstream = new ParallelStream(zkHost, "collection1", mstream, 2, new StreamComparator("a_i",ComparatorOrder.ASCENDING));
List<Tuple> tuples = getTuples(pstream);
assert(tuples.size() == 9);
@@ -951,8 +898,8 @@ public class StreamingTest extends Abstr
paramsB = mapParams("q","id:(0 2 3 6)","fl","id,a_s,a_i","sort", "a_i desc", "partitionKeys", "a_i");
streamB = new CloudSolrStream(zkHost, "collection1", paramsB);
- mstream = new MergeStream(streamA, streamB, new FieldComparator("a_i",ComparatorOrder.DESCENDING));
- pstream = new ParallelStream(zkHost, "collection1", mstream, 2, new FieldComparator("a_i",ComparatorOrder.DESCENDING));
+ mstream = new MergeStream(streamA, streamB, new StreamComparator("a_i",ComparatorOrder.DESCENDING));
+ pstream = new ParallelStream(zkHost, "collection1", mstream, 2, new StreamComparator("a_i",ComparatorOrder.DESCENDING));
tuples = getTuples(pstream);
assert(tuples.size() == 8);
@@ -987,9 +934,9 @@ public class StreamingTest extends Abstr
Map paramsB = mapParams("q","id:(0 2 3 6)","fl","id,a_s,a_i","sort", "a_i asc", "partitionKeys", "a_i");
CloudSolrStream streamB = new CloudSolrStream(zkHost, "collection1", paramsB);
- MergeStream mstream = new MergeStream(streamA, streamB, new FieldComparator("a_i",ComparatorOrder.ASCENDING));
+ MergeStream mstream = new MergeStream(streamA, streamB, new StreamComparator("a_i",ComparatorOrder.ASCENDING));
CountStream cstream = new CountStream(mstream);
- ParallelStream pstream = new ParallelStream(zkHost, "collection1", cstream, 2, new FieldComparator("a_i",ComparatorOrder.ASCENDING));
+ ParallelStream pstream = new ParallelStream(zkHost, "collection1", cstream, 2, new StreamComparator("a_i",ComparatorOrder.ASCENDING));
List<Tuple> tuples = getTuples(pstream);
assert(tuples.size() == 9);