You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by jb...@apache.org on 2015/03/25 20:42:26 UTC

svn commit: r1669212 [2/2] - in /lucene/dev/branches/branch_5x: ./ solr/ solr/core/ solr/core/src/java/org/apache/solr/handler/ solr/core/src/java/org/apache/solr/response/ solr/core/src/java/org/apache/solr/search/ solr/core/src/test-files/solr/collec...

Modified: lucene/dev/branches/branch_5x/solr/solrj/src/test/org/apache/solr/client/solrj/io/StreamingTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_5x/solr/solrj/src/test/org/apache/solr/client/solrj/io/StreamingTest.java?rev=1669212&r1=1665391&r2=1669212&view=diff
==============================================================================
--- lucene/dev/branches/branch_5x/solr/solrj/src/test/org/apache/solr/client/solrj/io/StreamingTest.java (original)
+++ lucene/dev/branches/branch_5x/solr/solrj/src/test/org/apache/solr/client/solrj/io/StreamingTest.java Wed Mar 25 19:42:25 2015
@@ -19,6 +19,7 @@ package org.apache.solr.client.solrj.io;
 
 import java.io.File;
 import java.io.IOException;
+import java.io.Serializable;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -127,323 +128,85 @@ public class StreamingTest extends Abstr
 
   }
 
-  private void testHashJoinStream() throws Exception {
 
-    indexr(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "0");
-    indexr(id, "2", "a_s", "hello2", "a_i", "2", "a_f", "0", "join_i", "1000");
-    indexr(id, "3", "a_s", "hello3", "a_i", "3", "a_f", "3");
-    indexr(id, "4", "a_s", "hello4", "a_i", "4", "a_f", "4");
-    indexr(id, "1", "a_s", "hello1", "a_i", "1", "a_f", "1");
-    indexr(id, "6", "a_s", "hello1", "a_i", "1", "a_f", "0", "join_i", "2000");
-    indexr(id, "7", "a_s", "hello7", "a_i", "1", "a_f", "0");
-
-    commit();
+  private void testSpacesInParams() throws Exception {
 
     String zkHost = zkServer.getZkAddress();
 
-    //Test one-to-one
-    Map paramsA = mapParams("q","id:(0 1 3 4) ","fl","id,a_s,a_f", "sort", "a_s desc");
-    CloudSolrStream streamA = new CloudSolrStream(zkHost, "collection1", paramsA);
-
-    Map fieldMappings = new HashMap();
-    fieldMappings.put("id","streamB.id");
-
-    Map paramsB = mapParams("q","id:(2)","fl","id,a_s,a_f,join_i", "sort", "a_s desc");
-    CloudSolrStream streamB = new CloudSolrStream(zkHost, "collection1", paramsB);
-    streamB.setFieldMappings(fieldMappings);
-
-    String[] keys = {"a_f"};
-
-    HashJoinStream fstream = new HashJoinStream(streamA, streamB, keys);
-    List<Tuple> tuples = getTuples(fstream);
-
-    assert(tuples.size() == 1);
-    assertOrder(tuples, 0);
-    assertLong(tuples.get(0), "join_i", 1000);
-
-
-    //Test one-to-many
-
-    paramsA = mapParams("q","id:(0 1 3 4) ","fl","id,a_s,a_f", "sort", "a_s desc");
-    streamA = new CloudSolrStream(zkHost, "collection1", paramsA);
-
-    fieldMappings = new HashMap();
-    fieldMappings.put("id","streamB.id");
-
-    paramsB = mapParams("q","id:(2 6)","fl","id,a_s,a_f,join_i", "sort", "a_s desc");
-    streamB = new CloudSolrStream(zkHost, "collection1", paramsB);
-    streamB.setFieldMappings(fieldMappings);
-
-
-    fstream = new HashJoinStream(streamA, streamB, keys);
-    tuples = getTuples(fstream);
+    Map params = mapParams("q","*:*","fl","id , a_s , a_i , a_f","sort", "a_f  asc , a_i  asc");
 
-    assert(tuples.size() == 2);
-    assertOrder(tuples, 0,0);
-    assertLong(tuples.get(0), "join_i", 1000);
-    assertLong(tuples.get(1), "join_i", 2000);
+    //CloudSolrStream compares the values of the sort with the fl field.
+    //The constructor will throw an exception if the sort fields do not the
+    //a value in the field list.
 
-    //Test many-to-one
-
-    paramsA = mapParams("q","id:(0 2 1 3 4) ","fl","id,a_s,a_f", "sort", "a_s desc");
-    streamA = new CloudSolrStream(zkHost, "collection1", paramsA);
-
-    fieldMappings = new HashMap();
-    fieldMappings.put("id","streamB.id");
-
-    paramsB = mapParams("q","id:(6)","fl","id,a_s,a_f,join_i", "sort", "a_s desc");
-    streamB = new CloudSolrStream(zkHost, "collection1", paramsB);
-    streamB.setFieldMappings(fieldMappings);
-
-
-    fstream = new HashJoinStream(streamA, streamB, keys);
-    tuples = getTuples(fstream);
-
-    assert(tuples.size() == 2);
-    assertOrder(tuples, 2,0);
-    assertLong(tuples.get(0), "join_i", 2000);
-    assertLong(tuples.get(1), "join_i", 2000);
-
-    //Test many-to-many
-
-    paramsA = mapParams("q","id:(0 7 1 3 4) ","fl","id,a_s,a_f", "sort", "a_s desc");
-    streamA = new CloudSolrStream(zkHost, "collection1", paramsA);
-
-    fieldMappings = new HashMap();
-    fieldMappings.put("id","streamB.id");
-
-    paramsB = mapParams("q","id:(6 2)","fl","id,a_s,a_f,join_i", "sort", "a_s desc");
-    streamB = new CloudSolrStream(zkHost, "collection1", paramsB);
-    streamB.setFieldMappings(fieldMappings);
-
-
-    fstream = new HashJoinStream(streamA, streamB, keys);
-    tuples = getTuples(fstream);
-
-    assert(tuples.size() == 4);
-    assertOrder(tuples, 7,7,0,0);
-    assertLong(tuples.get(0), "join_i", 1000);
-    assertLong(tuples.get(1), "join_i", 2000);
-    assertLong(tuples.get(2), "join_i", 1000);
-    assertLong(tuples.get(3), "join_i", 2000);
+    CloudSolrStream stream = new CloudSolrStream(zkHost, "collection1", params);
 
     del("*:*");
     commit();
 
   }
 
-  private void testMergeJoinStream() throws Exception {
+  private void testNonePartitionKeys() throws Exception {
 
-    indexr(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "0");
-    indexr(id, "2", "a_s", "hello2", "a_i", "2", "a_f", "0", "join_i", "1000");
+    indexr(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "1");
+    indexr(id, "2", "a_s", "hello0", "a_i", "2", "a_f", "2");
     indexr(id, "3", "a_s", "hello3", "a_i", "3", "a_f", "3");
     indexr(id, "4", "a_s", "hello4", "a_i", "4", "a_f", "4");
-    indexr(id, "1", "a_s", "hello1", "a_i", "1", "a_f", "1");
-    indexr(id, "6", "a_s", "hello1", "a_i", "1", "a_f", "0", "join_i", "2000");
-    indexr(id, "7", "a_s", "hello7", "a_i", "1", "a_f", "0");
+    indexr(id, "1", "a_s", "hello0", "a_i", "1", "a_f", "5");
+    indexr(id, "5", "a_s", "hello3", "a_i", "10", "a_f", "6");
+    indexr(id, "6", "a_s", "hello4", "a_i", "11", "a_f", "7");
+    indexr(id, "7", "a_s", "hello3", "a_i", "12", "a_f", "8");
+    indexr(id, "8", "a_s", "hello3", "a_i", "13", "a_f", "9");
+    indexr(id, "9", "a_s", "hello0", "a_i", "14", "a_f", "10");
 
     commit();
 
     String zkHost = zkServer.getZkAddress();
 
-    //Test one-to-one
-    Map paramsA = mapParams("q","id:(0 1 3 4) ","fl","id,a_s,a_f", "sort", "a_f desc");
-    CloudSolrStream streamA = new CloudSolrStream(zkHost, "collection1", paramsA);
-
-    Map fieldMappings = new HashMap();
-    fieldMappings.put("id","streamB.id");
-
-    Map paramsB = mapParams("q","id:(2)","fl","id,a_s,a_f,join_i", "sort", "a_f desc");
-    CloudSolrStream streamB = new CloudSolrStream(zkHost, "collection1", paramsB);
-    streamB.setFieldMappings(fieldMappings);
-
-    String[] keys = {"a_f"};
-
-    MergeJoinStream fstream = new MergeJoinStream(streamA, streamB, new DescFieldComp("a_f"));
-    List<Tuple> tuples = getTuples(fstream);
-
-    assert(tuples.size() == 1);
-    assertOrder(tuples, 0);
-    assertLong(tuples.get(0), "join_i", 1000);
-
-
-    //Test one-to-many
-
-    paramsA = mapParams("q","id:(0 1 3 4) ","fl","id,a_s,a_f", "sort", "a_f desc");
-    streamA = new CloudSolrStream(zkHost, "collection1", paramsA);
-
-    fieldMappings = new HashMap();
-    fieldMappings.put("id","streamB.id");
-
-    paramsB = mapParams("q","id:(2 6)","fl","id,a_s,a_f,join_i", "sort", "a_f desc");
-    streamB = new CloudSolrStream(zkHost, "collection1", paramsB);
-    streamB.setFieldMappings(fieldMappings);
-
-
-    fstream = new MergeJoinStream(streamA, streamB, new DescFieldComp("a_f"));
-    tuples = getTuples(fstream);
-
-    assert(tuples.size() == 2);
-    assertOrder(tuples, 0,0);
-    assertLong(tuples.get(0), "join_i", 1000);
-    assertLong(tuples.get(1), "join_i", 2000);
-
-    //Test many-to-one
-
-    paramsA = mapParams("q","id:(0 2 1 3 4) ","fl","id,a_s,a_f", "sort", "a_f desc");
-    streamA = new CloudSolrStream(zkHost, "collection1", paramsA);
-
-    fieldMappings = new HashMap();
-    fieldMappings.put("id","streamB.id");
-
-    paramsB = mapParams("q","id:(6)","fl","id,a_s,a_f,join_i", "sort", "a_f desc");
-    streamB = new CloudSolrStream(zkHost, "collection1", paramsB);
-    streamB.setFieldMappings(fieldMappings);
-
-
-    fstream = new MergeJoinStream(streamA, streamB, new DescFieldComp("a_f"));
-    tuples = getTuples(fstream);
-
-    assert(tuples.size() == 2);
-    assertOrder(tuples, 2,0);
-    assertLong(tuples.get(0), "join_i", 2000);
-    assertLong(tuples.get(1), "join_i", 2000);
-
-    //Test many-to-many
-
-    paramsA = mapParams("q","id:(0 7 1 3 4) ","fl","id,a_s,a_f", "sort", "a_f desc");
-    streamA = new CloudSolrStream(zkHost, "collection1", paramsA);
-
-    fieldMappings = new HashMap();
-    fieldMappings.put("id","streamB.id");
-
-    paramsB = mapParams("q","id:(6 2)","fl","id,a_s,a_f,join_i", "sort", "a_f desc");
-    streamB = new CloudSolrStream(zkHost, "collection1", paramsB);
-    streamB.setFieldMappings(fieldMappings);
-
+    Map paramsA = mapParams("q","*:*","fl","id,a_s,a_i,a_f","sort", "a_s asc,a_f asc", "partitionKeys", "none");
+    CloudSolrStream stream = new CloudSolrStream(zkHost, "collection1", paramsA);
+    ParallelStream pstream = new ParallelStream(zkHost, "collection1", stream, 2, new AscFieldComp("a_s"));
 
-    fstream = new MergeJoinStream(streamA, streamB, new DescFieldComp("a_f"));
-    tuples = getTuples(fstream);
+    List<Tuple> tuples = getTuples(pstream);
 
-    assert(tuples.size() == 4);
-    assertOrder(tuples, 7,7,0,0);
-    assertLong(tuples.get(0), "join_i", 1000);
-    assertLong(tuples.get(1), "join_i", 2000);
-    assertLong(tuples.get(2), "join_i", 1000);
-    assertLong(tuples.get(3), "join_i", 2000);
+    assert(tuples.size() == 20); // Each tuple will be double counted.
 
     del("*:*");
     commit();
 
   }
 
-  private void testParallelMergeJoinStream() throws Exception {
+
+
+
+  private void testParallelUniqueStream() throws Exception {
 
     indexr(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "0");
-    indexr(id, "2", "a_s", "hello2", "a_i", "2", "a_f", "0", "join_i", "1000");
+    indexr(id, "2", "a_s", "hello2", "a_i", "2", "a_f", "0");
     indexr(id, "3", "a_s", "hello3", "a_i", "3", "a_f", "3");
     indexr(id, "4", "a_s", "hello4", "a_i", "4", "a_f", "4");
     indexr(id, "1", "a_s", "hello1", "a_i", "1", "a_f", "1");
-    indexr(id, "6", "a_s", "hello1", "a_i", "1", "a_f", "0", "join_i", "2000");
-    indexr(id, "7", "a_s", "hello7", "a_i", "1", "a_f", "0");
+    indexr(id, "5", "a_s", "hello1", "a_i", "10", "a_f", "1");
+    indexr(id, "6", "a_s", "hello1", "a_i", "11", "a_f", "5");
+    indexr(id, "7", "a_s", "hello1", "a_i", "12", "a_f", "5");
+    indexr(id, "8", "a_s", "hello1", "a_i", "13", "a_f", "4");
 
     commit();
 
     String zkHost = zkServer.getZkAddress();
 
-    //Test one-to-one
-    Map paramsA = mapParams("q","id:(0 1 3 4) ","fl","id,a_s,a_f", "sort", "a_f desc", "partitionKeys","a_f");
-    CloudSolrStream streamA = new CloudSolrStream(zkHost, "collection1", paramsA);
-
-    Map fieldMappings = new HashMap();
-    fieldMappings.put("id","streamB.id");
-
-    Map paramsB = mapParams("q","id:(2)","fl","id,a_s,a_f,join_i", "sort", "a_f desc", "partitionKeys","a_f");
-    CloudSolrStream streamB = new CloudSolrStream(zkHost, "collection1", paramsB);
-    streamB.setFieldMappings(fieldMappings);
-
-    String[] keys = {"a_f"};
-
-    MergeJoinStream mstream = new MergeJoinStream(streamA, streamB, new DescFieldComp("a_f"));
-    ParallelStream fstream = new ParallelStream(zkHost,"collection1", mstream, 2, new DescFieldComp("a_f"));
-
-    List<Tuple> tuples = getTuples(fstream);
-
-    assert(tuples.size() == 1);
-    assertOrder(tuples, 0);
-    assertLong(tuples.get(0), "join_i", 1000);
-
-
-    //Test one-to-many
-
-    paramsA = mapParams("q","id:(0 1 3 4) ","fl","id,a_s,a_f", "sort", "a_f desc", "partitionKeys","a_f");
-    streamA = new CloudSolrStream(zkHost, "collection1", paramsA);
-
-    fieldMappings = new HashMap();
-    fieldMappings.put("id","streamB.id");
-
-    paramsB = mapParams("q","id:(2 6)","fl","id,a_s,a_f,join_i", "sort", "a_f desc", "partitionKeys","a_f");
-    streamB = new CloudSolrStream(zkHost, "collection1", paramsB);
-    streamB.setFieldMappings(fieldMappings);
-
-
-    mstream = new MergeJoinStream(streamA, streamB, new DescFieldComp("a_f"));
-    fstream = new ParallelStream(zkHost,"collection1", mstream, 2, new DescFieldComp("a_f"));
-
-    tuples = getTuples(fstream);
-
-    assert(tuples.size() == 2);
-    assertOrder(tuples, 0,0);
-    assertLong(tuples.get(0), "join_i", 1000);
-    assertLong(tuples.get(1), "join_i", 2000);
-
-    //Test many-to-one
-
-    paramsA = mapParams("q","id:(0 2 1 3 4) ","fl","id,a_s,a_f", "sort", "a_f desc", "partitionKeys","a_f");
-    streamA = new CloudSolrStream(zkHost, "collection1", paramsA);
-
-    fieldMappings = new HashMap();
-    fieldMappings.put("id","streamB.id");
-
-    paramsB = mapParams("q","id:(6)","fl","id,a_s,a_f,join_i", "sort", "a_f desc", "partitionKeys","a_f");
-    streamB = new CloudSolrStream(zkHost, "collection1", paramsB);
-    streamB.setFieldMappings(fieldMappings);
-
-
-    mstream = new MergeJoinStream(streamA, streamB, new DescFieldComp("a_f"));
-    fstream = new ParallelStream(zkHost,"collection1", mstream, 2, new DescFieldComp("a_f"));
-
-    tuples = getTuples(fstream);
-
-    assert(tuples.size() == 2);
-    assertOrder(tuples, 2,0);
-    assertLong(tuples.get(0), "join_i", 2000);
-    assertLong(tuples.get(1), "join_i", 2000);
-
-    //Test many-to-many
-
-    paramsA = mapParams("q","id:(0 7 1 3 4) ","fl","id,a_s,a_f", "sort", "a_f desc", "partitionKeys","a_f");
-    streamA = new CloudSolrStream(zkHost, "collection1", paramsA);
-
-    fieldMappings = new HashMap();
-    fieldMappings.put("id","streamB.id");
-
-    paramsB = mapParams("q","id:(6 2)","fl","id,a_s,a_f,join_i", "sort", "a_f desc", "partitionKeys","a_f");
-    streamB = new CloudSolrStream(zkHost, "collection1", paramsB);
-    streamB.setFieldMappings(fieldMappings);
-
-
-    mstream = new MergeJoinStream(streamA, streamB, new DescFieldComp("a_f"));
-    fstream = new ParallelStream(zkHost,"collection1", mstream, 2, new DescFieldComp("a_f"));
+    Map params = mapParams("q","*:*","fl","id,a_s,a_i,a_f","sort", "a_f asc,a_i asc", "partitionKeys", "a_f");
+    CloudSolrStream stream = new CloudSolrStream(zkHost, "collection1", params);
+    UniqueStream ustream = new UniqueStream(stream, new AscFieldComp("a_f"));
+    ParallelStream pstream = new ParallelStream(zkHost, "collection1", ustream, 2, new AscFieldComp("a_f"));
+    List<Tuple> tuples = getTuples(pstream);
+    assert(tuples.size() == 5);
+    assertOrder(tuples, 0,1,3,4,6);
 
-    tuples = getTuples(fstream);
+    //Test the eofTuples
 
-    assert(tuples.size() == 4);
-    assertOrder(tuples, 7,7,0,0);
-    assertLong(tuples.get(0), "join_i", 1000);
-    assertLong(tuples.get(1), "join_i", 2000);
-    assertLong(tuples.get(2), "join_i", 1000);
-    assertLong(tuples.get(3), "join_i", 2000);
+    Map<String,Tuple> eofTuples = pstream.getEofTuples();
+    assert(eofTuples.size() == 2); //There should be an EOF tuple for each worker.
 
     del("*:*");
     commit();
@@ -478,467 +241,188 @@ public class StreamingTest extends Abstr
     commit();
   }
 
-  private void testRollupStream() throws Exception {
-    indexr(id, "0", "a_s", "hello0", "a_i", "100", "a_f", "0");
-    indexr(id, "2", "a_s", "hello0", "a_i", "2", "a_f", "0");
-    indexr(id, "3", "a_s", "hello3", "a_i", "3", "a_f", "3");
-    indexr(id, "4", "a_s", "hello3", "a_i", "4", "a_f", "4");
-    indexr(id, "1", "a_s", "hello1", "a_i", "1", "a_f", "1");
-    indexr(id, "6", "a_s", "hello1", "a_i", "1", "a_f", "1");
-    indexr(id, "7", "a_s", "hello1", "a_i", "1", "a_f", "1");
+  private void testParallelRankStream() throws Exception {
 
-    commit();
-
-    String zkHost = zkServer.getZkAddress();
 
-    Bucket[] buckets = {new Bucket("a_s")};
-    Metric[] metrics = {new SumMetric("a_i", false),
-        new MeanMetric("a_i", false),
-        new CountMetric(),
-        new MinMetric("a_i", false),
-        new MaxMetric("a_i", false)};
-
-    Map params = mapParams("q","*:*","fl","id,a_s,a_i","sort", "a_s asc");
-    CloudSolrStream stream = new CloudSolrStream(zkHost, "collection1", params);
-    RollupStream rstream = new RollupStream(stream, buckets, metrics);
-    rstream.open();
-    Tuple tuple = rstream.read();
-    String b = (String)tuple.get("buckets");
-    List<Double> values = (List<Double>)tuple.get("metricValues");
-    assert(b.equals("hello0"));
-    assert(values.get(0) == 102.0d);
-    assert(values.get(1) == 51.0d);
-    assert(values.get(2) == 2.0d);
-    assert(values.get(3) == 2.0d);
-    assert(values.get(4) == 100.0d);
-
-    tuple = rstream.read();
-    b = (String)tuple.get("buckets");
-    values = (List<Double>)tuple.get("metricValues");
-    assert(b.equals("hello1"));
-    assert(values.get(0) == 3.0d);
-    assert(values.get(1) == 1.0d);
-    assert(values.get(2) == 3.0d);
-    assert(values.get(3) == 1.0d);
-    assert(values.get(4) == 1.0d);
-
-
-    tuple = rstream.read();
-    b = (String)tuple.get("buckets");
-    values = (List<Double>)tuple.get("metricValues");
-    assert(b.equals("hello3"));
-    assert(values.get(0) == 7.0d);
-    assert(values.get(1) == 3.5d);
-    assert(values.get(2) == 2.0d);
-    assert(values.get(3) == 3.0d);
-    assert(values.get(4) == 4.0d);
-
-    tuple = rstream.read();
-    assert(tuple.EOF);
-
-    rstream.close();
-    del("*:*");
-    commit();
-  }
-
-  private void testParallelRollupStream() throws Exception {
-    indexr(id, "0", "a_s", "hello0", "a_i", "100", "a_f", "0");
-    indexr(id, "2", "a_s", "hello0", "a_i", "2", "a_f", "0");
+    indexr(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "0");
+    indexr(id, "2", "a_s", "hello2", "a_i", "2", "a_f", "0");
     indexr(id, "3", "a_s", "hello3", "a_i", "3", "a_f", "3");
-    indexr(id, "4", "a_s", "hello3", "a_i", "4", "a_f", "4");
-    indexr(id, "1", "a_s", "hello1", "a_i", "1", "a_f", "1");
-    indexr(id, "6", "a_s", "hello1", "a_i", "1", "a_f", "1");
-    indexr(id, "7", "a_s", "hello1", "a_i", "1", "a_f", "1");
+    indexr(id, "4", "a_s", "hello4", "a_i", "4", "a_f", "4");
+    indexr(id, "5", "a_s", "hello1", "a_i", "5", "a_f", "1");
+    indexr(id, "6", "a_s", "hello1", "a_i", "6", "a_f", "1");
+    indexr(id, "7", "a_s", "hello1", "a_i", "7", "a_f", "1");
+    indexr(id, "8", "a_s", "hello1", "a_i", "8", "a_f", "1");
+    indexr(id, "9", "a_s", "hello1", "a_i", "9", "a_f", "1");
+    indexr(id, "10", "a_s", "hello1", "a_i", "10", "a_f", "1");
 
     commit();
 
     String zkHost = zkServer.getZkAddress();
 
-    Bucket[] buckets = {new Bucket("a_s")};
-    Metric[] metrics = {new SumMetric("a_i", false),
-        new MeanMetric("a_i", false),
-        new CountMetric(),
-        new MinMetric("a_i", false),
-        new MaxMetric("a_i", false)};
-
-    Map params = mapParams("q","*:*","fl","id,a_s,a_i","sort", "a_s asc","partitionKeys","a_s");
+    Map params = mapParams("q","*:*","fl","id,a_s,a_i","sort", "a_i asc", "partitionKeys", "a_i");
     CloudSolrStream stream = new CloudSolrStream(zkHost, "collection1", params);
-    RollupStream rostream = new RollupStream(stream, buckets, metrics);
-    ParallelStream rstream = new ParallelStream(zkHost,"collection1", rostream, 2, new AscFieldComp("buckets"));
-
-    rstream.open();
-    Tuple tuple = rstream.read();
-    String b = (String)tuple.get("buckets");
-    List<Double> values = (List<Double>)tuple.get("metricValues");
-    assert(b.equals("hello0"));
-    assert(values.get(0) == 102.0d);
-    assert(values.get(1) == 51.0d);
-    assert(values.get(2) == 2.0d);
-    assert(values.get(3) == 2.0d);
-    assert(values.get(4) == 100.0d);
-
-    tuple = rstream.read();
-    b = (String)tuple.get("buckets");
-    values = (List<Double>)tuple.get("metricValues");
-    assert(b.equals("hello1"));
-    assert(values.get(0) == 3.0d);
-    assert(values.get(1) == 1.0d);
-    assert(values.get(2) == 3.0d);
-    assert(values.get(3) == 1.0d);
-    assert(values.get(4) == 1.0d);
-
-
-    tuple = rstream.read();
-    b = (String)tuple.get("buckets");
-    values = (List<Double>)tuple.get("metricValues");
-    assert(b.equals("hello3"));
-    assert(values.get(0) == 7.0d);
-    assert(values.get(1) == 3.5d);
-    assert(values.get(2) == 2.0d);
-    assert(values.get(3) == 3.0d);
-    assert(values.get(4) == 4.0d);
+    RankStream rstream = new RankStream(stream, 11, new DescFieldComp("a_i"));
+    ParallelStream pstream = new ParallelStream(zkHost, "collection1", rstream, 2, new DescFieldComp("a_i"));
+    List<Tuple> tuples = getTuples(pstream);
 
-    tuple = rstream.read();
-    assert(tuple.EOF);
+    assert(tuples.size() == 10);
+    assertOrder(tuples, 10,9,8,7,6,5,4,3,2,0);
 
-    rstream.close();
     del("*:*");
     commit();
   }
 
+  private void testTrace() throws Exception {
 
-
-  private void testMetricStream() throws Exception {
-
-    indexr(id, "0", "a_s", "hello0", "a_i", "100", "a_f", "0");
-    indexr(id, "2", "a_s", "hello0", "a_i", "2", "a_f", "0");
+    indexr(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "1");
+    indexr(id, "2", "a_s", "hello0", "a_i", "2", "a_f", "2");
     indexr(id, "3", "a_s", "hello3", "a_i", "3", "a_f", "3");
-    indexr(id, "4", "a_s", "hello3", "a_i", "4", "a_f", "4");
-    indexr(id, "1", "a_s", "hello1", "a_i", "1", "a_f", "1");
-    indexr(id, "6", "a_s", "hello1", "a_i", "1", "a_f", "1");
-    indexr(id, "7", "a_s", "hello1", "a_i", "1", "a_f", "1");
+    indexr(id, "4", "a_s", "hello4", "a_i", "4", "a_f", "4");
+    indexr(id, "1", "a_s", "hello0", "a_i", "1", "a_f", "5");
+    indexr(id, "5", "a_s", "hello3", "a_i", "10", "a_f", "6");
+    indexr(id, "6", "a_s", "hello4", "a_i", "11", "a_f", "7");
+    indexr(id, "7", "a_s", "hello3", "a_i", "12", "a_f", "8");
+    indexr(id, "8", "a_s", "hello3", "a_i", "13", "a_f", "9");
+    indexr(id, "9", "a_s", "hello0", "a_i", "14", "a_f", "10");
 
     commit();
 
     String zkHost = zkServer.getZkAddress();
 
-    Bucket[] buckets = {new Bucket("a_s")};
-    Metric[] metrics = {new SumMetric("a_i", false),
-                        new MeanMetric("a_i", false),
-                        new CountMetric(),
-                        new MinMetric("a_i", false),
-                        new MaxMetric("a_i", false)};
-
-    Map params = mapParams("q","*:*","fl","id,a_s,a_i","sort", "a_i asc");
-    CloudSolrStream stream = new CloudSolrStream(zkHost, "collection1", params);
-    MetricStream mstream = new MetricStream(stream, buckets, metrics, "metric1", new DescBucketComp(0),5);
-    getTuples(mstream);
-
-    BucketMetrics[] bucketMetrics = mstream.getBucketMetrics();
-    assert(bucketMetrics.length == 3);
-
-    //Bucket should be is descending order based on Metric 0, which is the SumMetric.
-
-    assert(bucketMetrics[0].getKey().toString().equals("hello0"));
-    assert(bucketMetrics[1].getKey().toString().equals("hello3"));
-    assert(bucketMetrics[2].getKey().toString().equals("hello1"));
-
-    assertMetric(bucketMetrics[0].getMetrics()[0], 102.0d); //Test the first Metric of the first BucketMetrics
-    assertMetric(bucketMetrics[0].getMetrics()[1], 51.0d); //Test the second Metric of the first BucketMetrics
-    assertMetric(bucketMetrics[0].getMetrics()[2], 2.0d); //Test the third Metric of the first BucketMetrics
-    assertMetric(bucketMetrics[0].getMetrics()[3], 2.0d); //Test the fourth Metric of the first BucketMetrics
-    assertMetric(bucketMetrics[0].getMetrics()[4], 100.0d); //Test the fifth Metric of the first BucketMetrics
-
-
-    assertMetric(bucketMetrics[1].getMetrics()[0], 7.0d);
-    assertMetric(bucketMetrics[2].getMetrics()[0], 3.0d);
-
-
-    params = mapParams("q","*:*","fl","id,a_s,a_i","sort", "a_i asc");
-    stream = new CloudSolrStream(zkHost, "collection1", params);
-    mstream = new MetricStream(stream, buckets, metrics, "metric1", new AscBucketComp(0),5);
-    getTuples(mstream);
-
-    bucketMetrics = mstream.getBucketMetrics();
-
-    assertMetric(bucketMetrics[0].getMetrics()[0], 3.0d); //Test the first Metric of the first BucketMetrics
-    assertMetric(bucketMetrics[0].getMetrics()[1], 1.0d); //Test the second Metric of the first BucketMetrics
-    assertMetric(bucketMetrics[0].getMetrics()[2], 3.0d); //Test the third Metric of the first BucketMetrics
-    assertMetric(bucketMetrics[0].getMetrics()[3], 1.0d); //Test the fourth Metric of the first BucketMetrics
-    assertMetric(bucketMetrics[0].getMetrics()[4], 1.0d); //Test the fifth Metric of the first BucketMetrics
-
-    assertMetric(bucketMetrics[1].getMetrics()[0], 7.0d);
-    assertMetric(bucketMetrics[2].getMetrics()[0], 102.0d);
-
-    indexr(id, "8", "a_s", "hello4", "a_i", "1000", "a_f", "1"); //Add a fourth record.
-    commit();
-
-    //Test desc comp with more buckets then priority queue can hold.
-    params = mapParams("q","*:*","fl","id,a_s,a_i","sort", "a_i asc");
-    stream = new CloudSolrStream(zkHost, "collection1", params);
-    mstream = new MetricStream(stream, buckets, metrics, "metric1", new DescBucketComp(0),3);
-    getTuples(mstream);
-
-    bucketMetrics = mstream.getBucketMetrics();
-    assert(bucketMetrics.length == 3);
-    assert(bucketMetrics[0].getKey().toString().equals("hello4"));
-    assert(bucketMetrics[1].getKey().toString().equals("hello0"));
-    assert(bucketMetrics[2].getKey().toString().equals("hello3"));
-
-    //Test asc comp with more buckets then priority queue can hold.
-    params = mapParams("q","*:*","fl","id,a_s,a_i","sort", "a_i asc");
-    stream = new CloudSolrStream(zkHost, "collection1", params);
-    mstream = new MetricStream(stream, buckets, metrics, "metric1", new AscBucketComp(0),3);
-    getTuples(mstream);
-
-    bucketMetrics = mstream.getBucketMetrics();
-    assert(bucketMetrics.length == 3);
-    assert(bucketMetrics[0].getKey().toString().equals("hello1"));
-    assert(bucketMetrics[1].getKey().toString().equals("hello3"));
-    assert(bucketMetrics[2].getKey().toString().equals("hello0"));
-
-
-    //Test with no buckets
-    params = mapParams("q","*:*","fl","id,a_s,a_i","sort", "a_i asc");
-    stream = new CloudSolrStream(zkHost, "collection1", params);
-    mstream = new MetricStream(stream, metrics, "metric1");
-    getTuples(mstream);
-
-    bucketMetrics = mstream.getBucketMetrics();
-    assert(bucketMetrics.length == 1);
-    assert(bucketMetrics[0].getKey().toString().equals("metrics"));
-    assertMetric(bucketMetrics[0].getMetrics()[0], 1112.0d); //Test the first Metric of the first BucketMetrics
+    //Test with spaces in the parameter lists.
+    Map paramsA = mapParams("q","*:*","fl","id,a_s, a_i,  a_f","sort", "a_s asc  ,  a_f   asc");
+    CloudSolrStream stream = new CloudSolrStream(zkHost, "collection1", paramsA);
+    stream.setTrace(true);
+    List<Tuple> tuples = getTuples(stream);
+    assert(tuples.get(0).get("_COLLECTION_").equals("collection1"));
+    assert(tuples.get(1).get("_COLLECTION_").equals("collection1"));
+    assert(tuples.get(2).get("_COLLECTION_").equals("collection1"));
+    assert(tuples.get(3).get("_COLLECTION_").equals("collection1"));
 
     del("*:*");
     commit();
   }
 
 
-  private void testParallelMetricStream() throws Exception {
-
-    indexr(id, "0", "a_s", "hello0", "a_i", "100", "a_f", "0");
-    indexr(id, "2", "a_s", "hello0", "a_i", "2", "a_f", "0");
-    indexr(id, "3", "a_s", "hello3", "a_i", "3", "a_f", "3");
-    indexr(id, "4", "a_s", "hello3", "a_i", "4", "a_f", "4");
-    indexr(id, "1", "a_s", "hello1", "a_i", "1", "a_f", "1");
-    indexr(id, "6", "a_s", "hello1", "a_i", "1", "a_f", "1");
-    indexr(id, "7", "a_s", "hello1", "a_i", "1", "a_f", "1");
-
-    commit();
-
-    String zkHost = zkServer.getZkAddress();
-
-    Bucket[] buckets = {new Bucket("a_s")};
-    Metric[] metrics = {new SumMetric("a_i", false),
-        new MeanMetric("a_i", false),
-        new CountMetric(),
-        new MinMetric("a_i", false),
-        new MaxMetric("a_i", false)};
-
-    Map params = mapParams("q","*:*","fl","id,a_s,a_i","sort", "a_i asc", "partitionKeys", "a_i");
-    CloudSolrStream stream = new CloudSolrStream(zkHost, "collection1", params);
-    MetricStream mstream = new MetricStream(stream, buckets, metrics, "metric1", new DescBucketComp(0),5);
-    ParallelStream pstream = new ParallelStream(zkHost,"collection1",mstream,2,new AscFieldComp("a_i"));
-    getTuples(pstream);
-
-    BucketMetrics[] bucketMetrics = mstream.getBucketMetrics();
-    assert(bucketMetrics.length == 3);
-
-    //Bucket should be is descending order based on Metric 0, which is the SumMetric.
-
-    assert(bucketMetrics[0].getKey().toString().equals("hello0"));
-    assert(bucketMetrics[1].getKey().toString().equals("hello3"));
-    assert(bucketMetrics[2].getKey().toString().equals("hello1"));
 
-    assertMetric(bucketMetrics[0].getMetrics()[0], 102.0d); //Test the first Metric of the first BucketMetrics
-    assertMetric(bucketMetrics[0].getMetrics()[1], 51.0d); //Test the second Metric of the first BucketMetrics
-    assertMetric(bucketMetrics[0].getMetrics()[2], 2.0d); //Test the third Metric of the first BucketMetrics
-    assertMetric(bucketMetrics[0].getMetrics()[3], 2.0d); //Test the fourth Metric of the first BucketMetrics
-    assertMetric(bucketMetrics[0].getMetrics()[4], 100.0d); //Test the fifth Metric of the first BucketMetrics
 
+  private void testReducerStream() throws Exception {
 
-    assertMetric(bucketMetrics[1].getMetrics()[0], 7.0d);
-    assertMetric(bucketMetrics[2].getMetrics()[0], 3.0d);
-
-
-    params = mapParams("q","*:*","fl","id,a_s,a_i","sort", "a_i asc");
-    stream = new CloudSolrStream(zkHost, "collection1", params);
-    mstream = new MetricStream(stream, buckets, metrics, "metric1", new AscBucketComp(0),5);
-    getTuples(mstream);
-
-    bucketMetrics = mstream.getBucketMetrics();
-
-    assertMetric(bucketMetrics[0].getMetrics()[0], 3.0d); //Test the first Metric of the first BucketMetrics
-    assertMetric(bucketMetrics[0].getMetrics()[1], 1.0d); //Test the second Metric of the first BucketMetrics
-    assertMetric(bucketMetrics[0].getMetrics()[2], 3.0d); //Test the third Metric of the first BucketMetrics
-    assertMetric(bucketMetrics[0].getMetrics()[3], 1.0d); //Test the fourth Metric of the first BucketMetrics
-    assertMetric(bucketMetrics[0].getMetrics()[4], 1.0d); //Test the fifth Metric of the first BucketMetrics
-
-    assertMetric(bucketMetrics[1].getMetrics()[0], 7.0d);
-    assertMetric(bucketMetrics[2].getMetrics()[0], 102.0d);
-
-    indexr(id, "8", "a_s", "hello4", "a_i", "1000", "a_f", "1"); //Add a fourth record.
-    commit();
-
-    //Test desc comp with more buckets then priority queue can hold.
-    params = mapParams("q","*:*","fl","id,a_s,a_i","sort", "a_i asc");
-    stream = new CloudSolrStream(zkHost, "collection1", params);
-    mstream = new MetricStream(stream, buckets, metrics, "metric1", new DescBucketComp(0),3);
-    getTuples(mstream);
-
-    bucketMetrics = mstream.getBucketMetrics();
-    assert(bucketMetrics.length == 3);
-    assert(bucketMetrics[0].getKey().toString().equals("hello4"));
-    assert(bucketMetrics[1].getKey().toString().equals("hello0"));
-    assert(bucketMetrics[2].getKey().toString().equals("hello3"));
-
-    //Test asc comp with more buckets then priority queue can hold.
-    params = mapParams("q","*:*","fl","id,a_s,a_i","sort", "a_i asc");
-    stream = new CloudSolrStream(zkHost, "collection1", params);
-    mstream = new MetricStream(stream, buckets, metrics, "metric1", new AscBucketComp(0),3);
-    getTuples(mstream);
-
-    bucketMetrics = mstream.getBucketMetrics();
-    assert(bucketMetrics.length == 3);
-    assert(bucketMetrics[0].getKey().toString().equals("hello1"));
-    assert(bucketMetrics[1].getKey().toString().equals("hello3"));
-    assert(bucketMetrics[2].getKey().toString().equals("hello0"));
-
-
-    //Test with no buckets
-    params = mapParams("q","*:*","fl","id,a_s,a_i","sort", "a_i asc");
-    stream = new CloudSolrStream(zkHost, "collection1", params);
-    mstream = new MetricStream(stream, metrics, "metric1");
-    getTuples(mstream);
-
-    bucketMetrics = mstream.getBucketMetrics();
-    assert(bucketMetrics.length == 1);
-    assert(bucketMetrics[0].getKey().toString().equals("metrics"));
-    assertMetric(bucketMetrics[0].getMetrics()[0], 1112.0d); //Test the first Metric of the first BucketMetrics
-
-    del("*:*");
-    commit();
-  }
-
-  private void testGroupByStream() throws Exception {
-
-    indexr(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "0");
-    indexr(id, "2", "a_s", "hello0", "a_i", "2", "a_f", "0");
+    indexr(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "1");
+    indexr(id, "2", "a_s", "hello0", "a_i", "2", "a_f", "2");
     indexr(id, "3", "a_s", "hello3", "a_i", "3", "a_f", "3");
     indexr(id, "4", "a_s", "hello4", "a_i", "4", "a_f", "4");
-    indexr(id, "1", "a_s", "hello0", "a_i", "1", "a_f", "1");
+    indexr(id, "1", "a_s", "hello0", "a_i", "1", "a_f", "5");
+    indexr(id, "5", "a_s", "hello3", "a_i", "10", "a_f", "6");
+    indexr(id, "6", "a_s", "hello4", "a_i", "11", "a_f", "7");
+    indexr(id, "7", "a_s", "hello3", "a_i", "12", "a_f", "8");
+    indexr(id, "8", "a_s", "hello3", "a_i", "13", "a_f", "9");
+    indexr(id, "9", "a_s", "hello0", "a_i", "14", "a_f", "10");
 
     commit();
 
-    //Test CloudSolrStream and SumStream over an int field
     String zkHost = zkServer.getZkAddress();
 
-    Map paramsA = mapParams("q","*:*","fl","id,a_s,a_i","sort", "a_s asc");
+    //Test with spaces in the parameter lists.
+    Map paramsA = mapParams("q","*:*","fl","id,a_s, a_i,  a_f","sort", "a_s asc  ,  a_f   asc");
     CloudSolrStream stream = new CloudSolrStream(zkHost, "collection1", paramsA);
-    GroupByStream gstream = new GroupByStream(stream, new AscFieldComp("a_s"), new DescFieldComp("a_i"), 5);
+    ReducerStream rstream = new ReducerStream(stream, new AscFieldComp("a_s"));
 
-    List<Tuple> tuples = getTuples(gstream);
+    List<Tuple> tuples = getTuples(rstream);
 
     assert(tuples.size() == 3);
-    assertOrder(tuples, 2,3,4);
-    assertGroupOrder(tuples.get(0), 1, 0);
-
-    del("*:*");
-    commit();
-  }
+    assertOrder(tuples, 0,3,4);
 
+    Tuple t0 = tuples.get(0);
+    List<Map> maps0 = t0.getMaps();
+    assertMaps(maps0, 0, 2,1, 9);
 
-  private void testFilterStream() throws Exception {
+    Tuple t1 = tuples.get(1);
+    List<Map> maps1 = t1.getMaps();
+    assertMaps(maps1, 3, 5, 7, 8);
 
-    indexr(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "0");
-    indexr(id, "2", "a_s", "hello2", "a_i", "2", "a_f", "0");
-    indexr(id, "3", "a_s", "hello3", "a_i", "3", "a_f", "3");
-    indexr(id, "4", "a_s", "hello4", "a_i", "4", "a_f", "4");
-    indexr(id, "1", "a_s", "hello1", "a_i", "1", "a_f", "1");
+    Tuple t2 = tuples.get(2);
+    List<Map> maps2 = t2.getMaps();
+    assertMaps(maps2, 4, 6);
 
-    commit();
 
-    //Test CloudSolrStream and SumStream over an int field
-    String zkHost = zkServer.getZkAddress();
-
-    Map paramsA = mapParams("q","*:*","fl","id,a_s,a_i","sort", "a_s asc");
-    CloudSolrStream streamA = new CloudSolrStream(zkHost, "collection1", paramsA);
-
-    Map paramsB = mapParams("q","id:(0 2)","fl","a_s","sort", "a_s asc");
-    CloudSolrStream streamB = new CloudSolrStream(zkHost, "collection1", paramsB);
-
-
-    FilterStream fstream = new FilterStream(streamA, streamB, new AscFieldComp("a_s"));
-    List<Tuple> tuples = getTuples(fstream);
-
-    assert(tuples.size() == 2);
-    assertOrder(tuples, 0,2);
 
     del("*:*");
     commit();
   }
 
-  private void testParallelStream() throws Exception {
 
-    indexr(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "0");
-    indexr(id, "2", "a_s", "hello2", "a_i", "2", "a_f", "0");
+  private void testParallelReducerStream() throws Exception {
+
+    indexr(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "1");
+    indexr(id, "2", "a_s", "hello0", "a_i", "2", "a_f", "2");
     indexr(id, "3", "a_s", "hello3", "a_i", "3", "a_f", "3");
     indexr(id, "4", "a_s", "hello4", "a_i", "4", "a_f", "4");
-    indexr(id, "1", "a_s", "hello1", "a_i", "1", "a_f", "1");
+    indexr(id, "1", "a_s", "hello0", "a_i", "1", "a_f", "5");
+    indexr(id, "5", "a_s", "hello3", "a_i", "10", "a_f", "6");
+    indexr(id, "6", "a_s", "hello4", "a_i", "11", "a_f", "7");
+    indexr(id, "7", "a_s", "hello3", "a_i", "12", "a_f", "8");
+    indexr(id, "8", "a_s", "hello3", "a_i", "13", "a_f", "9");
+    indexr(id, "9", "a_s", "hello0", "a_i", "14", "a_f", "10");
 
     commit();
 
     String zkHost = zkServer.getZkAddress();
 
-    Map paramsA = mapParams("q","*:*","fl","id,a_s,a_i","sort", "a_s asc", "partitionKeys","a_s");
-    CloudSolrStream streamA = new CloudSolrStream(zkHost, "collection1", paramsA);
-
-    Map paramsB = mapParams("q","id:(0 2)","fl","a_s","sort", "a_s asc", "partitionKeys","a_s");
-    CloudSolrStream streamB = new CloudSolrStream(zkHost, "collection1", paramsB);
+    Map paramsA = mapParams("q","*:*","fl","id,a_s,a_i,a_f","sort", "a_s asc,a_f asc", "partitionKeys", "a_s");
+    CloudSolrStream stream = new CloudSolrStream(zkHost, "collection1", paramsA);
+    ReducerStream rstream = new ReducerStream(stream, new AscFieldComp("a_s"));
+    ParallelStream pstream = new ParallelStream(zkHost, "collection1", rstream, 2, new AscFieldComp("a_s"));
 
-    FilterStream fstream = new FilterStream(streamA, streamB, new AscFieldComp("a_s"));
-    ParallelStream pstream = new ParallelStream(zkHost,"collection1", fstream, 2, new AscFieldComp("a_s"));
     List<Tuple> tuples = getTuples(pstream);
 
-    assert(tuples.size() == 2);
-    assertOrder(tuples, 0,2);
+    assert(tuples.size() == 3);
+    assertOrder(tuples, 0,3,4);
 
-    del("*:*");
-    commit();
-  }
+    Tuple t0 = tuples.get(0);
+    List<Map> maps0 = t0.getMaps();
+    assertMaps(maps0, 0, 2, 1, 9);
 
+    Tuple t1 = tuples.get(1);
+    List<Map> maps1 = t1.getMaps();
+    assertMaps(maps1, 3, 5, 7, 8);
 
+    Tuple t2 = tuples.get(2);
+    List<Map> maps2 = t2.getMaps();
+    assertMaps(maps2, 4, 6);
 
-  private void testParallelHashJoinStream() {
+    //Test Descending with Ascending subsort
 
-  }
+    paramsA = mapParams("q","*:*","fl","id,a_s,a_i,a_f","sort", "a_s desc,a_f asc", "partitionKeys", "a_s");
+    stream = new CloudSolrStream(zkHost, "collection1", paramsA);
+    rstream = new ReducerStream(stream, new DescFieldComp("a_s"));
+    pstream = new ParallelStream(zkHost, "collection1", rstream, 2, new DescFieldComp("a_s"));
 
-  private void testParallelGroupByStream() throws Exception {
+    tuples = getTuples(pstream);
 
-    indexr(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "0");
-    indexr(id, "2", "a_s", "hello2", "a_i", "2", "a_f", "0");
-    indexr(id, "3", "a_s", "hello0", "a_i", "3", "a_f", "3");
-    indexr(id, "4", "a_s", "hello0", "a_i", "4", "a_f", "4");
-    indexr(id, "1", "a_s", "hello1", "a_i", "1", "a_f", "1");
+    assert(tuples.size() == 3);
+    assertOrder(tuples, 4,3,0);
 
-    commit();
+    t0 = tuples.get(0);
+    maps0 = t0.getMaps();
+    assertMaps(maps0, 4, 6);
+
+
+    t1 = tuples.get(1);
+    maps1 = t1.getMaps();
+    assertMaps(maps1, 3, 5, 7, 8);
+
+
+    t2 = tuples.get(2);
+    maps2 = t2.getMaps();
+    assertMaps(maps2, 0, 2, 1, 9);
 
-    String zkHost = zkServer.getZkAddress();
 
-    Map params = mapParams("q","*:*","fl","id,a_s,a_i","sort", "a_s asc", "partitionKeys","a_s");
-    CloudSolrStream stream = new CloudSolrStream(zkHost, "collection1", params);
-    GroupByStream gstream  = new GroupByStream(stream, new AscFieldComp("a_s"), new AscFieldComp("a_i"),5);
-    ParallelStream pstream = new ParallelStream(zkHost,"collection1", gstream, 2, new AscFieldComp("a_s"));
-    List<Tuple> tuples = getTuples(pstream);
 
-    assert(tuples.size() == 3);
-    assertOrder(tuples, 0,1,2);
-    assertGroupOrder(tuples.get(0),3,4);
     del("*:*");
     commit();
   }
 
-
   private void testTuple() throws Exception {
 
     indexr(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "5.1", "s_multi", "a", "s_multi", "b", "i_multi", "1", "i_multi", "2", "f_multi", "1.2", "f_multi", "1.3");
@@ -947,7 +431,7 @@ public class StreamingTest extends Abstr
 
     String zkHost = zkServer.getZkAddress();
 
-    Map params = mapParams("q","*:*","fl","id,a_s,a_i,a_f,s_multi,i_multi,f_multi","sort", "a_s asc", "partitionKeys","a_s");
+    Map params = mapParams("q","*:*","fl","id,a_s,a_i,a_f,s_multi,i_multi,f_multi","sort", "a_s asc");
     CloudSolrStream stream = new CloudSolrStream(zkHost, "collection1", params);
     List<Tuple> tuples = getTuples(stream);
     Tuple tuple = tuples.get(0);
@@ -978,7 +462,6 @@ public class StreamingTest extends Abstr
     commit();
   }
 
-
   private void testMergeStream() throws Exception {
 
     indexr(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "0");
@@ -1048,6 +531,104 @@ public class StreamingTest extends Abstr
   }
 
 
+  private void testParallelMergeStream() throws Exception {
+
+    indexr(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "0");
+    indexr(id, "2", "a_s", "hello2", "a_i", "2", "a_f", "0");
+    indexr(id, "3", "a_s", "hello3", "a_i", "3", "a_f", "3");
+    indexr(id, "4", "a_s", "hello4", "a_i", "4", "a_f", "4");
+    indexr(id, "1", "a_s", "hello1", "a_i", "1", "a_f", "1");
+    indexr(id, "5", "a_s", "hello0", "a_i", "10", "a_f", "0");
+    indexr(id, "6", "a_s", "hello2", "a_i", "8", "a_f", "0");
+    indexr(id, "7", "a_s", "hello3", "a_i", "7", "a_f", "3");
+    indexr(id, "8", "a_s", "hello4", "a_i", "11", "a_f", "4");
+    indexr(id, "9", "a_s", "hello1", "a_i", "100", "a_f", "1");
+
+    commit();
+
+    String zkHost = zkServer.getZkAddress();
+
+    //Test ascending
+    Map paramsA = mapParams("q","id:(4 1 8 7 9)","fl","id,a_s,a_i","sort", "a_i asc", "partitionKeys", "a_i");
+    CloudSolrStream streamA = new CloudSolrStream(zkHost, "collection1", paramsA);
+
+    Map paramsB = mapParams("q","id:(0 2 3 6)","fl","id,a_s,a_i","sort", "a_i asc", "partitionKeys", "a_i");
+    CloudSolrStream streamB = new CloudSolrStream(zkHost, "collection1", paramsB);
+
+    MergeStream mstream = new MergeStream(streamA, streamB, new AscFieldComp("a_i"));
+    ParallelStream pstream = new ParallelStream(zkHost, "collection1", mstream, 2, new AscFieldComp("a_i"));
+    List<Tuple> tuples = getTuples(pstream);
+
+    assert(tuples.size() == 9);
+    assertOrder(tuples, 0,1,2,3,4,7,6,8,9);
+
+    //Test descending
+    paramsA = mapParams("q","id:(4 1 8 9)","fl","id,a_s,a_i","sort", "a_i desc", "partitionKeys", "a_i");
+    streamA = new CloudSolrStream(zkHost, "collection1", paramsA);
+
+    paramsB = mapParams("q","id:(0 2 3 6)","fl","id,a_s,a_i","sort", "a_i desc", "partitionKeys", "a_i");
+    streamB = new CloudSolrStream(zkHost, "collection1", paramsB);
+
+    mstream = new MergeStream(streamA, streamB, new DescFieldComp("a_i"));
+    pstream = new ParallelStream(zkHost, "collection1", mstream, 2, new DescFieldComp("a_i"));
+    tuples = getTuples(pstream);
+
+    assert(tuples.size() == 8);
+    assertOrder(tuples, 9,8,6,4,3,2,1,0);
+
+    del("*:*");
+    commit();
+  }
+
+  private void testParallelEOF() throws Exception {
+
+    indexr(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "0");
+    indexr(id, "2", "a_s", "hello2", "a_i", "2", "a_f", "0");
+    indexr(id, "3", "a_s", "hello3", "a_i", "3", "a_f", "3");
+    indexr(id, "4", "a_s", "hello4", "a_i", "4", "a_f", "4");
+    indexr(id, "1", "a_s", "hello1", "a_i", "1", "a_f", "1");
+    indexr(id, "5", "a_s", "hello0", "a_i", "10", "a_f", "0");
+    indexr(id, "6", "a_s", "hello2", "a_i", "8", "a_f", "0");
+    indexr(id, "7", "a_s", "hello3", "a_i", "7", "a_f", "3");
+    indexr(id, "8", "a_s", "hello4", "a_i", "11", "a_f", "4");
+    indexr(id, "9", "a_s", "hello1", "a_i", "100", "a_f", "1");
+
+    commit();
+
+    String zkHost = zkServer.getZkAddress();
+
+    //Test ascending
+    Map paramsA = mapParams("q","id:(4 1 8 7 9)","fl","id,a_s,a_i","sort", "a_i asc", "partitionKeys", "a_i");
+    CloudSolrStream streamA = new CloudSolrStream(zkHost, "collection1", paramsA);
+
+    Map paramsB = mapParams("q","id:(0 2 3 6)","fl","id,a_s,a_i","sort", "a_i asc", "partitionKeys", "a_i");
+    CloudSolrStream streamB = new CloudSolrStream(zkHost, "collection1", paramsB);
+
+    MergeStream mstream = new MergeStream(streamA, streamB, new AscFieldComp("a_i"));
+    CountStream cstream = new CountStream(mstream);
+    ParallelStream pstream = new ParallelStream(zkHost, "collection1", cstream, 2, new AscFieldComp("a_i"));
+    List<Tuple> tuples = getTuples(pstream);
+
+    assert(tuples.size() == 9);
+    Map<String, Tuple> eofTuples = pstream.getEofTuples();
+    assert(eofTuples.size() == 2); // There should be an EOF Tuple for each worker.
+
+    long totalCount = 0;
+
+    Iterator<Tuple> it = eofTuples.values().iterator();
+    while(it.hasNext()) {
+      Tuple t = it.next();
+      totalCount += t.getLong("count");
+    }
+
+    assert(tuples.size() == totalCount);
+
+    del("*:*");
+    commit();
+  }
+
+
+
   @Test
   public void streamTests() throws Exception {
     assertNotNull(cloudClient);
@@ -1072,7 +653,7 @@ public class StreamingTest extends Abstr
     String zkHost = zkServer.getZkAddress();
     Map params = null;
 
-    //Basic CloudSolrStream Test with Ascending Sort
+    //Basic CloudSolrStream Test with Descending Sort
 
     params = mapParams("q","*:*","fl","id,a_s,a_i","sort", "a_i desc");
     CloudSolrStream stream = new CloudSolrStream(zkHost, "collection1", params);
@@ -1081,7 +662,7 @@ public class StreamingTest extends Abstr
     assert(tuples.size() == 5);
     assertOrder(tuples, 4, 3, 2, 1, 0);
 
-    //With Descending Sort
+    //With Ascending Sort
     params = mapParams("q","*:*","fl","id,a_s,a_i","sort", "a_i asc");
     stream = new CloudSolrStream(zkHost, "collection1", params);
     tuples = getTuples(stream);
@@ -1110,21 +691,18 @@ public class StreamingTest extends Abstr
     commit();
 
     testTuple();
+    testSpacesInParams();
+    testNonePartitionKeys();
+    testTrace();
     testUniqueStream();
-    testMetricStream();
-    testRollupStream();
     testRankStream();
-    testFilterStream();
-    testGroupByStream();
-    testHashJoinStream();
-    testMergeJoinStream();
     testMergeStream();
-    testParallelStream();
-    testParallelRollupStream();
-    testParallelMetricStream();
-    testParallelGroupByStream();
-    testParallelHashJoinStream();
-    testParallelMergeJoinStream();
+    testReducerStream();
+    testParallelEOF();
+    testParallelUniqueStream();
+    testParallelRankStream();
+    testParallelMergeStream();
+    testParallelReducerStream();
   }
 
   protected Map mapParams(String... vals) {
@@ -1179,19 +757,27 @@ public class StreamingTest extends Abstr
     return true;
   }
 
-  public boolean assertLong(Tuple tuple, String fieldName, long l) throws Exception {
-    long lv = (long)tuple.get(fieldName);
-    if(lv != l) {
-      throw new Exception("Longs not equal:"+l+" : "+lv);
+  protected boolean assertMaps(List<Map> maps, int... ids) throws Exception {
+    if(maps.size() != ids.length) {
+      throw new Exception("Expected id count != actual map count:"+ids.length+":"+maps.size());
     }
 
+    int i=0;
+    for(int val : ids) {
+      Map t = maps.get(i);
+      Long tip = (Long)t.get("id");
+      if(tip.intValue() != val) {
+        throw new Exception("Found value:"+tip.intValue()+" expecting:"+val);
+      }
+      ++i;
+    }
     return true;
   }
 
-  public boolean assertMetric(Metric metric, double value) throws Exception {
-    Double d = metric.getValue();
-    if(d.doubleValue() != value) {
-      throw new Exception("Unexpected Metric "+d+"!="+value);
+  public boolean assertLong(Tuple tuple, String fieldName, long l) throws Exception {
+    long lv = (long)tuple.get(fieldName);
+    if(lv != l) {
+      throw new Exception("Longs not equal:"+l+" : "+lv);
     }
 
     return true;