You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by jb...@apache.org on 2018/04/05 18:00:18 UTC

[4/8] lucene-solr:master: SOLR-12183: Refactor Streaming Expression test cases

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/80375acb/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java
index 7e8b6c6..4d88b4e 100644
--- a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java
+++ b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java
@@ -25,8 +25,6 @@ import java.util.List;
 import java.util.Locale;
 import java.util.Map;
 
-import org.apache.commons.math3.distribution.NormalDistribution;
-import org.apache.commons.math3.stat.inference.KolmogorovSmirnovTest;
 import org.apache.lucene.util.LuceneTestCase;
 import org.apache.lucene.util.LuceneTestCase.Slow;
 import org.apache.solr.client.solrj.embedded.JettySolrRunner;
@@ -35,20 +33,6 @@ import org.apache.solr.client.solrj.io.SolrClientCache;
 import org.apache.solr.client.solrj.io.Tuple;
 import org.apache.solr.client.solrj.io.comp.ComparatorOrder;
 import org.apache.solr.client.solrj.io.comp.FieldComparator;
-import org.apache.solr.client.solrj.io.eval.AddEvaluator;
-import org.apache.solr.client.solrj.io.eval.AndEvaluator;
-import org.apache.solr.client.solrj.io.eval.EqualToEvaluator;
-import org.apache.solr.client.solrj.io.eval.GreaterThanEqualToEvaluator;
-import org.apache.solr.client.solrj.io.eval.GreaterThanEvaluator;
-import org.apache.solr.client.solrj.io.eval.IfThenElseEvaluator;
-import org.apache.solr.client.solrj.io.eval.LessThanEqualToEvaluator;
-import org.apache.solr.client.solrj.io.eval.LessThanEvaluator;
-import org.apache.solr.client.solrj.io.eval.NotEvaluator;
-import org.apache.solr.client.solrj.io.eval.OrEvaluator;
-import org.apache.solr.client.solrj.io.eval.RawValueEvaluator;
-import org.apache.solr.client.solrj.io.ops.ConcatOperation;
-import org.apache.solr.client.solrj.io.ops.GroupOperation;
-import org.apache.solr.client.solrj.io.ops.ReplaceOperation;
 import org.apache.solr.client.solrj.io.stream.expr.StreamExpression;
 import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionParser;
 import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
@@ -68,12 +52,6 @@ import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
-/**
- *  All base tests will be done with CloudSolrStream. Under the covers CloudSolrStream uses SolrStream so
- *  SolrStream will get fully exercised through these tests.
- *
- **/
-
 @Slow
 @LuceneTestCase.SuppressCodecs({"Lucene3x", "Lucene40","Lucene41","Lucene42","Lucene45"})
 public class StreamExpressionTest extends SolrCloudTestCase {
@@ -293,9 +271,6 @@ public class StreamExpressionTest extends SolrCloudTestCase {
     }
   }
 
-
-
-
   @Test
   public void testCloudSolrStreamWithZkHost() throws Exception {
 
@@ -422,197 +397,6 @@ public class StreamExpressionTest extends SolrCloudTestCase {
     assertOrder(tuples, 0, 2, 1, 3, 4);
   }
 
-  @Test
-  public void testUniqueStream() throws Exception {
-
-    new UpdateRequest()
-        .add(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "0")
-        .add(id, "2", "a_s", "hello2", "a_i", "2", "a_f", "0")
-        .add(id, "3", "a_s", "hello3", "a_i", "3", "a_f", "3")
-        .add(id, "4", "a_s", "hello4", "a_i", "4", "a_f", "4")
-        .add(id, "1", "a_s", "hello1", "a_i", "1", "a_f", "1")
-        .commit(cluster.getSolrClient(), COLLECTIONORALIAS);
-
-    StreamExpression expression;
-    TupleStream stream;
-    List<Tuple> tuples;
-    StreamContext streamContext = new StreamContext();
-    SolrClientCache solrClientCache = new SolrClientCache();
-    streamContext.setSolrClientCache(solrClientCache);
-
-    StreamFactory factory = new StreamFactory()
-      .withCollectionZkHost(COLLECTIONORALIAS, cluster.getZkServer().getZkAddress())
-      .withFunctionName("search", CloudSolrStream.class)
-      .withFunctionName("unique", UniqueStream.class);
-
-    try {
-      // Basic test
-      expression = StreamExpressionParser.parse("unique(search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_i asc\"), over=\"a_f\")");
-      stream = new UniqueStream(expression, factory);
-      stream.setStreamContext(streamContext);
-      tuples = getTuples(stream);
-
-      assert (tuples.size() == 4);
-      assertOrder(tuples, 0, 1, 3, 4);
-
-      // Basic test desc
-      expression = StreamExpressionParser.parse("unique(search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f desc, a_i desc\"), over=\"a_f\")");
-      stream = new UniqueStream(expression, factory);
-      stream.setStreamContext(streamContext);
-      tuples = getTuples(stream);
-
-      assert (tuples.size() == 4);
-      assertOrder(tuples, 4, 3, 1, 2);
-
-      // Basic w/multi comp
-      expression = StreamExpressionParser.parse("unique(search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_i asc\"), over=\"a_f, a_i\")");
-      stream = new UniqueStream(expression, factory);
-      stream.setStreamContext(streamContext);
-      tuples = getTuples(stream);
-
-      assert (tuples.size() == 5);
-      assertOrder(tuples, 0, 2, 1, 3, 4);
-
-      // full factory w/multi comp
-      stream = factory.constructStream("unique(search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_i asc\"), over=\"a_f, a_i\")");
-      stream.setStreamContext(streamContext);
-      tuples = getTuples(stream);
-
-      assert (tuples.size() == 5);
-      assertOrder(tuples, 0, 2, 1, 3, 4);
-    } finally {
-      solrClientCache.close();
-    }
-  }
-
-  @Test
-  public void testSortStream() throws Exception {
-
-    new UpdateRequest()
-        .add(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "0")
-        .add(id, "2", "a_s", "hello2", "a_i", "2", "a_f", "0")
-        .add(id, "3", "a_s", "hello3", "a_i", "3", "a_f", "3")
-        .add(id, "4", "a_s", "hello4", "a_i", "4", "a_f", "4")
-        .add(id, "1", "a_s", "hello1", "a_i", "1", "a_f", "1")
-        .add(id, "5", "a_s", "hello1", "a_i", "1", "a_f", "2")
-        .commit(cluster.getSolrClient(), COLLECTIONORALIAS);
-
-    StreamExpression expression;
-    TupleStream stream;
-    List<Tuple> tuples;
-    StreamContext streamContext = new StreamContext();
-    SolrClientCache solrClientCache = new SolrClientCache();
-    streamContext.setSolrClientCache(solrClientCache);
-    try {
-      StreamFactory factory = new StreamFactory()
-          .withCollectionZkHost(COLLECTIONORALIAS, cluster.getZkServer().getZkAddress())
-          .withFunctionName("search", CloudSolrStream.class)
-          .withFunctionName("sort", SortStream.class);
-
-      // Basic test
-      stream = factory.constructStream("sort(search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc\"), by=\"a_i asc\")");
-      stream.setStreamContext(streamContext);
-      tuples = getTuples(stream);
-      assert (tuples.size() == 6);
-      assertOrder(tuples, 0, 1, 5, 2, 3, 4);
-
-      // Basic test desc
-      stream = factory.constructStream("sort(search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc\"), by=\"a_i desc\")");
-      stream.setStreamContext(streamContext);
-      tuples = getTuples(stream);
-      assert (tuples.size() == 6);
-      assertOrder(tuples, 4, 3, 2, 1, 5, 0);
-
-      // Basic w/multi comp
-      stream = factory.constructStream("sort(search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc\"), by=\"a_i asc, a_f desc\")");
-      stream.setStreamContext(streamContext);
-      tuples = getTuples(stream);
-      assert (tuples.size() == 6);
-      assertOrder(tuples, 0, 5, 1, 2, 3, 4);
-    } finally {
-      solrClientCache.close();
-    }
-  }
-
-
-  @Test
-  public void testNullStream() throws Exception {
-
-    new UpdateRequest()
-        .add(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "0")
-        .add(id, "2", "a_s", "hello2", "a_i", "2", "a_f", "0")
-        .add(id, "3", "a_s", "hello3", "a_i", "3", "a_f", "3")
-        .add(id, "4", "a_s", "hello4", "a_i", "4", "a_f", "4")
-        .add(id, "1", "a_s", "hello1", "a_i", "1", "a_f", "1")
-        .add(id, "5", "a_s", "hello1", "a_i", "1", "a_f", "2")
-        .commit(cluster.getSolrClient(), COLLECTIONORALIAS);
-
-    StreamExpression expression;
-    TupleStream stream;
-    List<Tuple> tuples;
-    StreamContext streamContext = new StreamContext();
-    SolrClientCache solrClientCache = new SolrClientCache();
-    streamContext.setSolrClientCache(solrClientCache);
-    StreamFactory factory = new StreamFactory()
-        .withCollectionZkHost(COLLECTIONORALIAS, cluster.getZkServer().getZkAddress())
-        .withFunctionName("search", CloudSolrStream.class)
-        .withFunctionName("null", NullStream.class);
-
-    try {
-      // Basic test
-      stream = factory.constructStream("null(search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc\"), by=\"a_i asc\")");
-      stream.setStreamContext(streamContext);
-      tuples = getTuples(stream);
-      assertTrue(tuples.size() == 1);
-      assertTrue(tuples.get(0).getLong("nullCount") == 6);
-    } finally {
-      solrClientCache.close();
-    }
-  }
-
-
-  @Test
-  public void testParallelNullStream() throws Exception {
-
-    new UpdateRequest()
-        .add(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "0")
-        .add(id, "2", "a_s", "hello2", "a_i", "2", "a_f", "0")
-        .add(id, "3", "a_s", "hello3", "a_i", "3", "a_f", "3")
-        .add(id, "4", "a_s", "hello4", "a_i", "4", "a_f", "4")
-        .add(id, "1", "a_s", "hello1", "a_i", "1", "a_f", "1")
-        .add(id, "5", "a_s", "hello1", "a_i", "1", "a_f", "2")
-        .commit(cluster.getSolrClient(), COLLECTIONORALIAS);
-
-    StreamExpression expression;
-    TupleStream stream;
-    List<Tuple> tuples;
-    StreamContext streamContext = new StreamContext();
-    SolrClientCache solrClientCache = new SolrClientCache();
-    streamContext.setSolrClientCache(solrClientCache);
-
-    StreamFactory factory = new StreamFactory()
-        .withCollectionZkHost(COLLECTIONORALIAS, cluster.getZkServer().getZkAddress())
-        .withFunctionName("search", CloudSolrStream.class)
-        .withFunctionName("null", NullStream.class)
-        .withFunctionName("parallel", ParallelStream.class);
-
-    try {
-
-      // Basic test
-      stream = factory.constructStream("parallel(" + COLLECTIONORALIAS + ", workers=2, sort=\"nullCount desc\", null(search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc\", partitionKeys=id), by=\"a_i asc\"))");
-      stream.setStreamContext(streamContext);
-      tuples = getTuples(stream);
-      assertTrue(tuples.size() == 2);
-      long nullCount = 0;
-      for (Tuple t : tuples) {
-        nullCount += t.getLong("nullCount");
-      }
-
-      assertEquals(nullCount, 6L);
-    } finally {
-      solrClientCache.close();
-    }
-  }
 
   @Test
   public void testNulls() throws Exception {
@@ -684,174 +468,7 @@ public class StreamExpressionTest extends SolrCloudTestCase {
     }
   }
 
-  @Test
-  public void testMergeStream() throws Exception {
-
-    new UpdateRequest()
-        .add(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "0")
-        .add(id, "2", "a_s", "hello2", "a_i", "2", "a_f", "0")
-        .add(id, "3", "a_s", "hello3", "a_i", "3", "a_f", "3")
-        .add(id, "4", "a_s", "hello4", "a_i", "4", "a_f", "4")
-        .add(id, "1", "a_s", "hello1", "a_i", "1", "a_f", "1")
-        .commit(cluster.getSolrClient(), COLLECTIONORALIAS);
-
-    StreamExpression expression;
-    TupleStream stream;
-    List<Tuple> tuples;
-    
-    StreamFactory factory = new StreamFactory()
-      .withCollectionZkHost(COLLECTIONORALIAS, cluster.getZkServer().getZkAddress())
-      .withFunctionName("search", CloudSolrStream.class)
-      .withFunctionName("unique", UniqueStream.class)
-      .withFunctionName("merge", MergeStream.class);
-    
-    // Basic test
-    expression = StreamExpressionParser.parse("merge("
-        + "search(" + COLLECTIONORALIAS + ", q=\"id:(0 3 4)\", fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc\"),"
-        + "search(" + COLLECTIONORALIAS + ", q=\"id:(1)\", fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc\"),"
-        + "on=\"a_f asc\")");
-
-    stream = new MergeStream(expression, factory);
-    StreamContext streamContext = new StreamContext();
-    SolrClientCache solrClientCache = new SolrClientCache();
-    streamContext.setSolrClientCache(solrClientCache);
-    try {
-      stream.setStreamContext(streamContext);
-      tuples = getTuples(stream);
-
-      assert (tuples.size() == 4);
-      assertOrder(tuples, 0, 1, 3, 4);
-
-      // Basic test desc
-      expression = StreamExpressionParser.parse("merge("
-          + "search(" + COLLECTIONORALIAS + ", q=\"id:(0 3 4)\", fl=\"id,a_s,a_i,a_f\", sort=\"a_f desc\"),"
-          + "search(" + COLLECTIONORALIAS + ", q=\"id:(1)\", fl=\"id,a_s,a_i,a_f\", sort=\"a_f desc\"),"
-          + "on=\"a_f desc\")");
-      stream = new MergeStream(expression, factory);
-      stream.setStreamContext(streamContext);
-      tuples = getTuples(stream);
-
-      assert (tuples.size() == 4);
-      assertOrder(tuples, 4, 3, 1, 0);
-
-      // Basic w/multi comp
-      expression = StreamExpressionParser.parse("merge("
-          + "search(" + COLLECTIONORALIAS + ", q=\"id:(0 3 4)\", fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_s asc\"),"
-          + "search(" + COLLECTIONORALIAS + ", q=\"id:(1 2)\", fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_s asc\"),"
-          + "on=\"a_f asc, a_s asc\")");
-      stream = new MergeStream(expression, factory);
-      stream.setStreamContext(streamContext);
-      tuples = getTuples(stream);
-
-      assert (tuples.size() == 5);
-      assertOrder(tuples, 0, 2, 1, 3, 4);
-
-      // full factory w/multi comp
-      stream = factory.constructStream("merge("
-          + "search(" + COLLECTIONORALIAS + ", q=\"id:(0 3 4)\", fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_s asc\"),"
-          + "search(" + COLLECTIONORALIAS + ", q=\"id:(1 2)\", fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_s asc\"),"
-          + "on=\"a_f asc, a_s asc\")");
-      stream.setStreamContext(streamContext);
-      tuples = getTuples(stream);
-
-      assert (tuples.size() == 5);
-      assertOrder(tuples, 0, 2, 1, 3, 4);
-
-      // full factory w/multi streams
-      stream = factory.constructStream("merge("
-          + "search(" + COLLECTIONORALIAS + ", q=\"id:(0 4)\", fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_s asc\"),"
-          + "search(" + COLLECTIONORALIAS + ", q=\"id:(1)\", fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_s asc\"),"
-          + "search(" + COLLECTIONORALIAS + ", q=\"id:(2)\", fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_s asc\"),"
-          + "on=\"a_f asc\")");
-      stream.setStreamContext(streamContext);
-      tuples = getTuples(stream);
-
-      assert (tuples.size() == 4);
-      assertOrder(tuples, 0, 2, 1, 4);
-    } finally {
-      solrClientCache.close();
-    }
-  }
-
-  @Test
-  public void testRankStream() throws Exception {
-
-    new UpdateRequest()
-        .add(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "0")
-        .add(id, "2", "a_s", "hello2", "a_i", "2", "a_f", "0")
-        .add(id, "3", "a_s", "hello3", "a_i", "3", "a_f", "3")
-        .add(id, "4", "a_s", "hello4", "a_i", "4", "a_f", "4")
-        .add(id, "1", "a_s", "hello1", "a_i", "1", "a_f", "1")
-        .commit(cluster.getSolrClient(), COLLECTIONORALIAS);
-
-    StreamExpression expression;
-    TupleStream stream;
-    List<Tuple> tuples;
-    StreamContext streamContext = new StreamContext();
-    SolrClientCache solrClientCache = new SolrClientCache();
-    streamContext.setSolrClientCache(solrClientCache);
-
-    StreamFactory factory = new StreamFactory()
-      .withCollectionZkHost(COLLECTIONORALIAS, cluster.getZkServer().getZkAddress())
-      .withFunctionName("search", CloudSolrStream.class)
-      .withFunctionName("unique", UniqueStream.class)
-      .withFunctionName("top", RankStream.class);
-    try {
-      // Basic test
-      expression = StreamExpressionParser.parse("top("
-          + "n=3,"
-          + "search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_i asc\"),"
-          + "sort=\"a_f asc, a_i asc\")");
-      stream = new RankStream(expression, factory);
-      stream.setStreamContext(streamContext);
-      tuples = getTuples(stream);
-
-      assert (tuples.size() == 3);
-      assertOrder(tuples, 0, 2, 1);
-
-      // Basic test desc
-      expression = StreamExpressionParser.parse("top("
-          + "n=2,"
-          + "unique("
-          + "search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f desc\"),"
-          + "over=\"a_f\"),"
-          + "sort=\"a_f desc\")");
-      stream = new RankStream(expression, factory);
-      stream.setStreamContext(streamContext);
-      tuples = getTuples(stream);
-
-      assert (tuples.size() == 2);
-      assertOrder(tuples, 4, 3);
-
-      // full factory
-      stream = factory.constructStream("top("
-          + "n=4,"
-          + "unique("
-          + "search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_i asc\"),"
-          + "over=\"a_f\"),"
-          + "sort=\"a_f asc\")");
-      stream.setStreamContext(streamContext);
-      tuples = getTuples(stream);
-
-      assert (tuples.size() == 4);
-      assertOrder(tuples, 0, 1, 3, 4);
-
-      // full factory, switch order
-      stream = factory.constructStream("top("
-          + "n=4,"
-          + "unique("
-          + "search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f desc, a_i desc\"),"
-          + "over=\"a_f\"),"
-          + "sort=\"a_f asc\")");
-      stream.setStreamContext(streamContext);
-      tuples = getTuples(stream);
 
-      assert (tuples.size() == 4);
-      assertOrder(tuples, 2, 1, 3, 4);
-    } finally {
-      solrClientCache.close();
-    }
-  }
 
   @Test
   public void testRandomStream() throws Exception {
@@ -983,8 +600,16 @@ public class StreamExpressionTest extends SolrCloudTestCase {
     }
   }
 
+
+
+
+
+
+
+
+
   @Test
-  public void testReducerStream() throws Exception {
+  public void testStatsStream() throws Exception {
 
     new UpdateRequest()
         .add(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "1")
@@ -998,8216 +623,1105 @@ public class StreamExpressionTest extends SolrCloudTestCase {
         .add(id, "8", "a_s", "hello3", "a_i", "13", "a_f", "9")
         .add(id, "9", "a_s", "hello0", "a_i", "14", "a_f", "10")
         .commit(cluster.getSolrClient(), COLLECTIONORALIAS);
-    
-    StreamExpression expression;
-    TupleStream stream;
-    List<Tuple> tuples;
-    Tuple t0, t1, t2;
-    List<Map> maps0, maps1, maps2;
-    StreamContext streamContext = new StreamContext();
-    SolrClientCache solrClientCache = new SolrClientCache();
-    streamContext.setSolrClientCache(solrClientCache);
 
     StreamFactory factory = new StreamFactory()
-        .withCollectionZkHost(COLLECTIONORALIAS, cluster.getZkServer().getZkAddress())
-        .withFunctionName("search", CloudSolrStream.class)
-        .withFunctionName("reduce", ReducerStream.class)
-        .withFunctionName("group", GroupOperation.class);
-
+    .withCollectionZkHost(COLLECTIONORALIAS, cluster.getZkServer().getZkAddress())
+    .withFunctionName("stats", StatsStream.class)
+    .withFunctionName("sum", SumMetric.class)
+    .withFunctionName("min", MinMetric.class)
+    .withFunctionName("max", MaxMetric.class)
+    .withFunctionName("avg", MeanMetric.class)
+    .withFunctionName("count", CountMetric.class);     
+  
+    StreamExpression expression;
+    TupleStream stream;
+    List<Tuple> tuples;
+    StreamContext streamContext = new StreamContext();
+    SolrClientCache cache = new SolrClientCache();
     try {
-      // basic
-      expression = StreamExpressionParser.parse("reduce("
-          + "search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_s asc, a_f asc\"),"
-          + "by=\"a_s\","
-          + "group(sort=\"a_f desc\", n=\"4\"))");
-
+      streamContext.setSolrClientCache(cache);
+      String expr = "stats(" + COLLECTIONORALIAS + ", q=*:*, sum(a_i), sum(a_f), min(a_i), min(a_f), max(a_i), max(a_f), avg(a_i), avg(a_f), count(*))";
+      expression = StreamExpressionParser.parse(expr);
       stream = factory.constructStream(expression);
       stream.setStreamContext(streamContext);
-      tuples = getTuples(stream);
-
-      assert (tuples.size() == 3);
-
-      t0 = tuples.get(0);
-      maps0 = t0.getMaps("group");
-      assertMaps(maps0, 9, 1, 2, 0);
-
-      t1 = tuples.get(1);
-      maps1 = t1.getMaps("group");
-      assertMaps(maps1, 8, 7, 5, 3);
 
-
-      t2 = tuples.get(2);
-      maps2 = t2.getMaps("group");
-      assertMaps(maps2, 6, 4);
-
-      // basic w/spaces
-      expression = StreamExpressionParser.parse("reduce("
-          + "search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_s asc, a_f       asc\"),"
-          + "by=\"a_s\"," +
-          "group(sort=\"a_i asc\", n=\"2\"))");
-      stream = factory.constructStream(expression);
-      stream.setStreamContext(streamContext);
       tuples = getTuples(stream);
 
-      assert (tuples.size() == 3);
-
-      t0 = tuples.get(0);
-      maps0 = t0.getMaps("group");
-      assert (maps0.size() == 2);
-
-      assertMaps(maps0, 0, 1);
-
-      t1 = tuples.get(1);
-      maps1 = t1.getMaps("group");
-      assertMaps(maps1, 3, 5);
-
-      t2 = tuples.get(2);
-      maps2 = t2.getMaps("group");
-      assertMaps(maps2, 4, 6);
-    } finally {
-      solrClientCache.close();
-    }
-  }
-
-
-  @Test
-  public void testHavingStream() throws Exception {
-
-    SolrClientCache solrClientCache = new SolrClientCache();
-
-    new UpdateRequest()
-        .add(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "1", "subject", "blah blah blah 0")
-        .add(id, "2", "a_s", "hello0", "a_i", "2", "a_f", "2", "subject", "blah blah blah 2")
-        .add(id, "3", "a_s", "hello3", "a_i", "3", "a_f", "3", "subject", "blah blah blah 3")
-        .add(id, "4", "a_s", "hello4", "a_i", "4", "a_f", "4", "subject", "blah blah blah 4")
-        .add(id, "1", "a_s", "hello0", "a_i", "1", "a_f", "5", "subject", "blah blah blah 1")
-        .add(id, "5", "a_s", "hello3", "a_i", "5", "a_f", "6", "subject", "blah blah blah 5")
-        .add(id, "6", "a_s", "hello4", "a_i", "6", "a_f", "7", "subject", "blah blah blah 6")
-        .add(id, "7", "a_s", "hello3", "a_i", "7", "a_f", "8", "subject", "blah blah blah 7")
-        .add(id, "8", "a_s", "hello3", "a_i", "8", "a_f", "9", "subject", "blah blah blah 8")
-        .add(id, "9", "a_s", "hello0", "a_i", "9", "a_f", "10", "subject", "blah blah blah 9")
-        .commit(cluster.getSolrClient(), COLLECTIONORALIAS);
+      assert (tuples.size() == 1);
 
-    TupleStream stream;
-    List<Tuple> tuples;
+      //Test Long and Double Sums
 
-    StreamFactory factory = new StreamFactory()
-        .withCollectionZkHost(COLLECTIONORALIAS, cluster.getZkServer().getZkAddress())
-        .withFunctionName("search", CloudSolrStream.class)
-        .withFunctionName("having", HavingStream.class)
-        .withFunctionName("rollup", RollupStream.class)
-        .withFunctionName("sum", SumMetric.class)
-        .withFunctionName("and", AndEvaluator.class)
-        .withFunctionName("or", OrEvaluator.class)
-        .withFunctionName("not", NotEvaluator.class)
-        .withFunctionName("gt", GreaterThanEvaluator.class)
-        .withFunctionName("lt", LessThanEvaluator.class)
-        .withFunctionName("eq", EqualToEvaluator.class)
-        .withFunctionName("lteq", LessThanEqualToEvaluator.class)
-        .withFunctionName("gteq", GreaterThanEqualToEvaluator.class);
-
-    stream = factory.constructStream("having(search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc\"), eq(a_i, 9))");
-    StreamContext context = new StreamContext();
-    context.setSolrClientCache(solrClientCache);
-    stream.setStreamContext(context);
-    tuples = getTuples(stream);
+      Tuple tuple = tuples.get(0);
 
-    assert(tuples.size() == 1);
-    Tuple t = tuples.get(0);
-    assertTrue(t.getString("id").equals("9"));
+      Double sumi = tuple.getDouble("sum(a_i)");
+      Double sumf = tuple.getDouble("sum(a_f)");
+      Double mini = tuple.getDouble("min(a_i)");
+      Double minf = tuple.getDouble("min(a_f)");
+      Double maxi = tuple.getDouble("max(a_i)");
+      Double maxf = tuple.getDouble("max(a_f)");
+      Double avgi = tuple.getDouble("avg(a_i)");
+      Double avgf = tuple.getDouble("avg(a_f)");
+      Double count = tuple.getDouble("count(*)");
 
-    stream = factory.constructStream("having(search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc\"), and(eq(a_i, 9),lt(a_i, 10)))");
-    context = new StreamContext();
-    context.setSolrClientCache(solrClientCache);
-    stream.setStreamContext(context);
-    tuples = getTuples(stream);
+      assertTrue(sumi.longValue() == 70);
+      assertTrue(sumf.doubleValue() == 55.0D);
+      assertTrue(mini.doubleValue() == 0.0D);
+      assertTrue(minf.doubleValue() == 1.0D);
+      assertTrue(maxi.doubleValue() == 14.0D);
+      assertTrue(maxf.doubleValue() == 10.0D);
+      assertTrue(avgi.doubleValue() == 7.0D);
+      assertTrue(avgf.doubleValue() == 5.5D);
+      assertTrue(count.doubleValue() == 10);
 
-    assert(tuples.size() == 1);
-    t = tuples.get(0);
-    assertTrue(t.getString("id").equals("9"));
 
-    stream = factory.constructStream("having(search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc\"), or(eq(a_i, 9),eq(a_i, 8)))");
-    context = new StreamContext();
-    context.setSolrClientCache(solrClientCache);
-    stream.setStreamContext(context);
-    tuples = getTuples(stream);
+      //Test with shards parameter
+      List<String> shardUrls = TupleStream.getShards(cluster.getZkServer().getZkAddress(), COLLECTIONORALIAS, streamContext);
+      expr = "stats(myCollection, q=*:*, sum(a_i), sum(a_f), min(a_i), min(a_f), max(a_i), max(a_f), avg(a_i), avg(a_f), count(*))";
+      Map<String, List<String>> shardsMap = new HashMap();
+      shardsMap.put("myCollection", shardUrls);
+      StreamContext context = new StreamContext();
+      context.put("shards", shardsMap);
+      context.setSolrClientCache(cache);
+      stream = factory.constructStream(expr);
+      stream.setStreamContext(context);
 
-    assert(tuples.size() == 2);
-    t = tuples.get(0);
-    assertTrue(t.getString("id").equals("8"));
+      tuples = getTuples(stream);
 
-    t = tuples.get(1);
-    assertTrue(t.getString("id").equals("9"));
+      assert (tuples.size() == 1);
 
+      //Test Long and Double Sums
 
-    stream = factory.constructStream("having(search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc\"), and(eq(a_i, 9),not(eq(a_i, 9))))");
-    context = new StreamContext();
-    context.setSolrClientCache(solrClientCache);
-    stream.setStreamContext(context);
-    tuples = getTuples(stream);
+      tuple = tuples.get(0);
 
-    assert(tuples.size() == 0);
+      sumi = tuple.getDouble("sum(a_i)");
+      sumf = tuple.getDouble("sum(a_f)");
+      mini = tuple.getDouble("min(a_i)");
+      minf = tuple.getDouble("min(a_f)");
+      maxi = tuple.getDouble("max(a_i)");
+      maxf = tuple.getDouble("max(a_f)");
+      avgi = tuple.getDouble("avg(a_i)");
+      avgf = tuple.getDouble("avg(a_f)");
+      count = tuple.getDouble("count(*)");
 
+      assertTrue(sumi.longValue() == 70);
+      assertTrue(sumf.doubleValue() == 55.0D);
+      assertTrue(mini.doubleValue() == 0.0D);
+      assertTrue(minf.doubleValue() == 1.0D);
+      assertTrue(maxi.doubleValue() == 14.0D);
+      assertTrue(maxf.doubleValue() == 10.0D);
+      assertTrue(avgi.doubleValue() == 7.0D);
+      assertTrue(avgf.doubleValue() == 5.5D);
+      assertTrue(count.doubleValue() == 10);
 
-    stream = factory.constructStream("having(search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc\"), and(lteq(a_i, 9), gteq(a_i, 8)))");
-    context = new StreamContext();
-    context.setSolrClientCache(solrClientCache);
-    stream.setStreamContext(context);
-    tuples = getTuples(stream);
+      //Execersise the /stream hander
 
-    assert(tuples.size() == 2);
+      //Add the shards http parameter for the myCollection
+      StringBuilder buf = new StringBuilder();
+      for (String shardUrl : shardUrls) {
+        if (buf.length() > 0) {
+          buf.append(",");
+        }
+        buf.append(shardUrl);
+      }
 
-    t = tuples.get(0);
-    assertTrue(t.getString("id").equals("8"));
+      ModifiableSolrParams solrParams = new ModifiableSolrParams();
+      solrParams.add("qt", "/stream");
+      solrParams.add("expr", expr);
+      solrParams.add("myCollection.shards", buf.toString());
+      SolrStream solrStream = new SolrStream(shardUrls.get(0), solrParams);
+      tuples = getTuples(solrStream);
+      assert (tuples.size() == 1);
 
-    t = tuples.get(1);
-    assertTrue(t.getString("id").equals("9"));
+      tuple =tuples.get(0);
 
-    stream = factory.constructStream("having(rollup(over=a_f, sum(a_i), search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc\")), and(eq(sum(a_i), 9),eq(sum(a_i), 9)))");
-    context = new StreamContext();
-    context.setSolrClientCache(solrClientCache);
-    stream.setStreamContext(context);
-    tuples = getTuples(stream);
+      sumi = tuple.getDouble("sum(a_i)");
+      sumf = tuple.getDouble("sum(a_f)");
+      mini = tuple.getDouble("min(a_i)");
+      minf = tuple.getDouble("min(a_f)");
+      maxi = tuple.getDouble("max(a_i)");
+      maxf = tuple.getDouble("max(a_f)");
+      avgi = tuple.getDouble("avg(a_i)");
+      avgf = tuple.getDouble("avg(a_f)");
+      count = tuple.getDouble("count(*)");
 
-    assert(tuples.size() == 1);
-    t = tuples.get(0);
-    assertTrue(t.getDouble("a_f") == 10.0D);
+      assertTrue(sumi.longValue() == 70);
+      assertTrue(sumf.doubleValue() == 55.0D);
+      assertTrue(mini.doubleValue() == 0.0D);
+      assertTrue(minf.doubleValue() == 1.0D);
+      assertTrue(maxi.doubleValue() == 14.0D);
+      assertTrue(maxf.doubleValue() == 10.0D);
+      assertTrue(avgi.doubleValue() == 7.0D);
+      assertTrue(avgf.doubleValue() == 5.5D);
+      assertTrue(count.doubleValue() == 10);
+      //Add a negative test to prove that it cannot find slices if shards parameter is removed
 
-    solrClientCache.close();
+      try {
+        ModifiableSolrParams solrParamsBad = new ModifiableSolrParams();
+        solrParamsBad.add("qt", "/stream");
+        solrParamsBad.add("expr", expr);
+        solrStream = new SolrStream(shardUrls.get(0), solrParamsBad);
+        tuples = getTuples(solrStream);
+        throw new Exception("Exception should have been thrown above");
+      } catch (IOException e) {
+        assertTrue(e.getMessage().contains("Collection not found: myCollection"));
+      }
+    } finally {
+      cache.close();
+    }
   }
 
-
   @Test
-  public void testParallelHavingStream() throws Exception {
-
-    SolrClientCache solrClientCache = new SolrClientCache();
+  public void testFacetStream() throws Exception {
 
     new UpdateRequest()
-        .add(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "1", "subject", "blah blah blah 0")
-        .add(id, "2", "a_s", "hello0", "a_i", "2", "a_f", "2", "subject", "blah blah blah 2")
-        .add(id, "3", "a_s", "hello3", "a_i", "3", "a_f", "3", "subject", "blah blah blah 3")
-        .add(id, "4", "a_s", "hello4", "a_i", "4", "a_f", "4", "subject", "blah blah blah 4")
-        .add(id, "1", "a_s", "hello0", "a_i", "1", "a_f", "5", "subject", "blah blah blah 1")
-        .add(id, "5", "a_s", "hello3", "a_i", "5", "a_f", "6", "subject", "blah blah blah 5")
-        .add(id, "6", "a_s", "hello4", "a_i", "6", "a_f", "7", "subject", "blah blah blah 6")
-        .add(id, "7", "a_s", "hello3", "a_i", "7", "a_f", "8", "subject", "blah blah blah 7")
-        .add(id, "8", "a_s", "hello3", "a_i", "8", "a_f", "9", "subject", "blah blah blah 8")
-        .add(id, "9", "a_s", "hello0", "a_i", "9", "a_f", "10", "subject", "blah blah blah 9")
+        .add(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "1")
+        .add(id, "2", "a_s", "hello0", "a_i", "2", "a_f", "2")
+        .add(id, "3", "a_s", "hello3", "a_i", "3", "a_f", "3")
+        .add(id, "4", "a_s", "hello4", "a_i", "4", "a_f", "4")
+        .add(id, "1", "a_s", "hello0", "a_i", "1", "a_f", "5")
+        .add(id, "5", "a_s", "hello3", "a_i", "10", "a_f", "6")
+        .add(id, "6", "a_s", "hello4", "a_i", "11", "a_f", "7")
+        .add(id, "7", "a_s", "hello3", "a_i", "12", "a_f", "8")
+        .add(id, "8", "a_s", "hello3", "a_i", "13", "a_f", "9")
+        .add(id, "9", "a_s", "hello0", "a_i", "14", "a_f", "10")
         .commit(cluster.getSolrClient(), COLLECTIONORALIAS);
-
+    
+    String clause;
     TupleStream stream;
     List<Tuple> tuples;
-
+    
     StreamFactory factory = new StreamFactory()
-        .withCollectionZkHost(COLLECTIONORALIAS, cluster.getZkServer().getZkAddress())
-        .withFunctionName("search", CloudSolrStream.class)
-        .withFunctionName("having", HavingStream.class)
-        .withFunctionName("rollup", RollupStream.class)
-        .withFunctionName("sum", SumMetric.class)
-        .withFunctionName("and", AndEvaluator.class)
-        .withFunctionName("or", OrEvaluator.class)
-        .withFunctionName("not", NotEvaluator.class)
-        .withFunctionName("gt", GreaterThanEvaluator.class)
-        .withFunctionName("lt", LessThanEvaluator.class)
-        .withFunctionName("eq", EqualToEvaluator.class)
-        .withFunctionName("lteq", LessThanEqualToEvaluator.class)
-        .withFunctionName("gteq", GreaterThanEqualToEvaluator.class)
-        .withFunctionName("val", RawValueEvaluator.class)
-        .withFunctionName("parallel", ParallelStream.class);
-
-    stream = factory.constructStream("parallel(" + COLLECTIONORALIAS + ", workers=2, sort=\"a_f asc\", having(search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc\", partitionKeys=id), eq(a_i, 9)))");
-    StreamContext context = new StreamContext();
-    context.setSolrClientCache(solrClientCache);
-    stream.setStreamContext(context);
-    tuples = getTuples(stream);
-
-    assert(tuples.size() == 1);
-    Tuple t = tuples.get(0);
-    assertTrue(t.getString("id").equals("9"));
-
-    stream = factory.constructStream("parallel(" + COLLECTIONORALIAS + ", workers=2, sort=\"a_f asc\", having(search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc\", partitionKeys=id), and(eq(a_i, 9),lt(a_i, 10))))");
-    context = new StreamContext();
-    context.setSolrClientCache(solrClientCache);
-    stream.setStreamContext(context);
+      .withCollectionZkHost("collection1", cluster.getZkServer().getZkAddress())
+      .withFunctionName("facet", FacetStream.class)
+      .withFunctionName("sum", SumMetric.class)
+      .withFunctionName("min", MinMetric.class)
+      .withFunctionName("max", MaxMetric.class)
+      .withFunctionName("avg", MeanMetric.class)
+      .withFunctionName("count", CountMetric.class);
+    
+    // Basic test
+    clause = "facet("
+              +   "collection1, "
+              +   "q=\"*:*\", "
+              +   "fl=\"a_s,a_i,a_f\", "
+              +   "sort=\"a_s asc\", "
+              +   "buckets=\"a_s\", "
+              +   "bucketSorts=\"sum(a_i) asc\", "
+              +   "bucketSizeLimit=100, "
+              +   "sum(a_i), sum(a_f), "
+              +   "min(a_i), min(a_f), "
+              +   "max(a_i), max(a_f), "
+              +   "avg(a_i), avg(a_f), "
+              +   "count(*)"
+              + ")";
+    
+    stream = factory.constructStream(clause);
     tuples = getTuples(stream);
 
-    assert(tuples.size() == 1);
-    t = tuples.get(0);
-    assertTrue(t.getString("id").equals("9"));
+    assert(tuples.size() == 3);
 
-    stream = factory.constructStream("parallel(" + COLLECTIONORALIAS + ", workers=2, sort=\"a_f asc\",having(search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc\", partitionKeys=id), or(eq(a_i, 9),eq(a_i, 8))))");
-    context = new StreamContext();
-    context.setSolrClientCache(solrClientCache);
-    stream.setStreamContext(context);
-    tuples = getTuples(stream);
+    //Test Long and Double Sums
 
-    assert(tuples.size() == 2);
-    t = tuples.get(0);
-    assertTrue(t.getString("id").equals("8"));
+    Tuple tuple = tuples.get(0);
+    String bucket = tuple.getString("a_s");
+    Double sumi = tuple.getDouble("sum(a_i)");
+    Double sumf = tuple.getDouble("sum(a_f)");
+    Double mini = tuple.getDouble("min(a_i)");
+    Double minf = tuple.getDouble("min(a_f)");
+    Double maxi = tuple.getDouble("max(a_i)");
+    Double maxf = tuple.getDouble("max(a_f)");
+    Double avgi = tuple.getDouble("avg(a_i)");
+    Double avgf = tuple.getDouble("avg(a_f)");
+    Double count = tuple.getDouble("count(*)");
 
-    t = tuples.get(1);
-    assertTrue(t.getString("id").equals("9"));
+    assertTrue(bucket.equals("hello4"));
+    assertTrue(sumi.longValue() == 15);
+    assertTrue(sumf.doubleValue() == 11.0D);
+    assertTrue(mini.doubleValue() == 4.0D);
+    assertTrue(minf.doubleValue() == 4.0D);
+    assertTrue(maxi.doubleValue() == 11.0D);
+    assertTrue(maxf.doubleValue() == 7.0D);
+    assertTrue(avgi.doubleValue() == 7.5D);
+    assertTrue(avgf.doubleValue() == 5.5D);
+    assertTrue(count.doubleValue() == 2);
 
+    tuple = tuples.get(1);
+    bucket = tuple.getString("a_s");
+    sumi = tuple.getDouble("sum(a_i)");
+    sumf = tuple.getDouble("sum(a_f)");
+    mini = tuple.getDouble("min(a_i)");
+    minf = tuple.getDouble("min(a_f)");
+    maxi = tuple.getDouble("max(a_i)");
+    maxf = tuple.getDouble("max(a_f)");
+    avgi = tuple.getDouble("avg(a_i)");
+    avgf = tuple.getDouble("avg(a_f)");
+    count = tuple.getDouble("count(*)");
 
-    stream = factory.constructStream("parallel(" + COLLECTIONORALIAS + ", workers=2, sort=\"a_f asc\", having(search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc\", partitionKeys=id), and(eq(a_i, 9),not(eq(a_i, 9)))))");
-    context = new StreamContext();
-    context.setSolrClientCache(solrClientCache);
-    stream.setStreamContext(context);
-    tuples = getTuples(stream);
-
-    assert(tuples.size() == 0);
+    assertTrue(bucket.equals("hello0"));
+    assertTrue(sumi.doubleValue() == 17.0D);
+    assertTrue(sumf.doubleValue() == 18.0D);
+    assertTrue(mini.doubleValue() == 0.0D);
+    assertTrue(minf.doubleValue() == 1.0D);
+    assertTrue(maxi.doubleValue() == 14.0D);
+    assertTrue(maxf.doubleValue() == 10.0D);
+    assertTrue(avgi.doubleValue() == 4.25D);
+    assertTrue(avgf.doubleValue() == 4.5D);
+    assertTrue(count.doubleValue() == 4);
 
+    tuple = tuples.get(2);
+    bucket = tuple.getString("a_s");
+    sumi = tuple.getDouble("sum(a_i)");
+    sumf = tuple.getDouble("sum(a_f)");
+    mini = tuple.getDouble("min(a_i)");
+    minf = tuple.getDouble("min(a_f)");
+    maxi = tuple.getDouble("max(a_i)");
+    maxf = tuple.getDouble("max(a_f)");
+    avgi = tuple.getDouble("avg(a_i)");
+    avgf = tuple.getDouble("avg(a_f)");
+    count = tuple.getDouble("count(*)");
 
-    stream = factory.constructStream("parallel(" + COLLECTIONORALIAS + ", workers=2, sort=\"a_f asc\",having(search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc\", partitionKeys=id), and(lteq(a_i, 9), gteq(a_i, 8))))");
-    context = new StreamContext();
-    context.setSolrClientCache(solrClientCache);
-    stream.setStreamContext(context);
-    tuples = getTuples(stream);
+    assertTrue(bucket.equals("hello3"));
+    assertTrue(sumi.doubleValue() == 38.0D);
+    assertTrue(sumf.doubleValue() == 26.0D);
+    assertTrue(mini.doubleValue() == 3.0D);
+    assertTrue(minf.doubleValue() == 3.0D);
+    assertTrue(maxi.doubleValue() == 13.0D);
+    assertTrue(maxf.doubleValue() == 9.0D);
+    assertTrue(avgi.doubleValue() == 9.5D);
+    assertTrue(avgf.doubleValue() == 6.5D);
+    assertTrue(count.doubleValue() == 4);
 
-    assert(tuples.size() == 2);
 
-    t = tuples.get(0);
-    assertTrue(t.getString("id").equals("8"));
+    //Reverse the Sort.
 
-    t = tuples.get(1);
-    assertTrue(t.getString("id").equals("9"));
+    clause = "facet("
+        +   "collection1, "
+        +   "q=\"*:*\", "
+        +   "fl=\"a_s,a_i,a_f\", "
+        +   "sort=\"a_s asc\", "
+        +   "buckets=\"a_s\", "
+        +   "bucketSorts=\"sum(a_i) desc\", "
+        +   "bucketSizeLimit=100, "
+        +   "sum(a_i), sum(a_f), "
+        +   "min(a_i), min(a_f), "
+        +   "max(a_i), max(a_f), "
+        +   "avg(a_i), avg(a_f), "
+        +   "count(*)"
+        + ")";
 
-    stream = factory.constructStream("parallel("+COLLECTIONORALIAS+", workers=2, sort=\"a_f asc\", having(rollup(over=a_f, sum(a_i), search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc\", partitionKeys=a_f)), and(eq(sum(a_i), 9),eq(sum(a_i),9))))");
-    context = new StreamContext();
-    context.setSolrClientCache(solrClientCache);
-    stream.setStreamContext(context);
+    stream = factory.constructStream(clause);
     tuples = getTuples(stream);
 
-    assert(tuples.size() == 1);
 
-    t = tuples.get(0);
-    assertTrue(t.getDouble("a_f") == 10.0D);
-
-    solrClientCache.close();
-  }
+    //Test Long and Double Sums
 
-  @Test
-  public void testFetchStream() throws Exception {
+    tuple = tuples.get(0);
+    bucket = tuple.getString("a_s");
+    sumi = tuple.getDouble("sum(a_i)");
+    sumf = tuple.getDouble("sum(a_f)");
+    mini = tuple.getDouble("min(a_i)");
+    minf = tuple.getDouble("min(a_f)");
+    maxi = tuple.getDouble("max(a_i)");
+    maxf = tuple.getDouble("max(a_f)");
+    avgi = tuple.getDouble("avg(a_i)");
+    avgf = tuple.getDouble("avg(a_f)");
+    count = tuple.getDouble("count(*)");
 
-    SolrClientCache solrClientCache = new SolrClientCache();//TODO share in @Before ; close in @After ?
+    assertTrue(bucket.equals("hello3"));
+    assertTrue(sumi.doubleValue() == 38.0D);
+    assertTrue(sumf.doubleValue() == 26.0D);
+    assertTrue(mini.doubleValue() == 3.0D);
+    assertTrue(minf.doubleValue() == 3.0D);
+    assertTrue(maxi.doubleValue() == 13.0D);
+    assertTrue(maxf.doubleValue() == 9.0D);
+    assertTrue(avgi.doubleValue() == 9.5D);
+    assertTrue(avgf.doubleValue() == 6.5D);
+    assertTrue(count.doubleValue() == 4);
 
-    new UpdateRequest()
-        .add(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "1", "subject", "blah blah blah 0")
-        .add(id, "2", "a_s", "hello0", "a_i", "2", "a_f", "2", "subject", "blah blah blah 2")
-        .add(id, "3", "a_s", "hello3", "a_i", "3", "a_f", "3", "subject", "blah blah blah 3")
-        .add(id, "4", "a_s", "hello4", "a_i", "4", "a_f", "4", "subject", "blah blah blah 4")
-        .add(id, "1", "a_s", "hello0", "a_i", "1", "a_f", "5", "subject", "blah blah blah 1")
-        .add(id, "5", "a_s", "hello3", "a_i", "5", "a_f", "6", "subject", "blah blah blah 5")
-        .add(id, "6", "a_s", "hello4", "a_i", "6", "a_f", "7", "subject", "blah blah blah 6")
-        .add(id, "7", "a_s", "hello3", "a_i", "7", "a_f", "8", "subject", "blah blah blah 7")
-        .add(id, "8", "a_s", "hello3", "a_i", "8", "a_f", "9", "subject", "blah blah blah 8")
-        .add(id, "9", "a_s", "hello0", "a_i", "9", "a_f", "10", "subject", "blah blah blah 9")
-        .commit(cluster.getSolrClient(), COLLECTIONORALIAS);
+    tuple = tuples.get(1);
+    bucket = tuple.getString("a_s");
+    sumi = tuple.getDouble("sum(a_i)");
+    sumf = tuple.getDouble("sum(a_f)");
+    mini = tuple.getDouble("min(a_i)");
+    minf = tuple.getDouble("min(a_f)");
+    maxi = tuple.getDouble("max(a_i)");
+    maxf = tuple.getDouble("max(a_f)");
+    avgi = tuple.getDouble("avg(a_i)");
+    avgf = tuple.getDouble("avg(a_f)");
+    count = tuple.getDouble("count(*)");
 
-    TupleStream stream;
-    List<Tuple> tuples;
+    assertTrue(bucket.equals("hello0"));
+    assertTrue(sumi.doubleValue() == 17.0D);
+    assertTrue(sumf.doubleValue() == 18.0D);
+    assertTrue(mini.doubleValue() == 0.0D);
+    assertTrue(minf.doubleValue() == 1.0D);
+    assertTrue(maxi.doubleValue() == 14.0D);
+    assertTrue(maxf.doubleValue() == 10.0D);
+    assertTrue(avgi.doubleValue() == 4.25D);
+    assertTrue(avgf.doubleValue() == 4.5D);
+    assertTrue(count.doubleValue() == 4);
 
-    StreamFactory factory = new StreamFactory()
-        .withCollectionZkHost(COLLECTIONORALIAS, cluster.getZkServer().getZkAddress())
-        .withFunctionName("search", CloudSolrStream.class)
-        .withFunctionName("fetch", FetchStream.class);
+    tuple = tuples.get(2);
+    bucket = tuple.getString("a_s");
+    sumi = tuple.getDouble("sum(a_i)");
+    sumf = tuple.getDouble("sum(a_f)");
+    mini = tuple.getDouble("min(a_i)");
+    minf = tuple.getDouble("min(a_f)");
+    maxi = tuple.getDouble("max(a_i)");
+    maxf = tuple.getDouble("max(a_f)");
+    avgi = tuple.getDouble("avg(a_i)");
+    avgf = tuple.getDouble("avg(a_f)");
+    count = tuple.getDouble("count(*)");
 
-    stream = factory.constructStream("fetch("+ COLLECTIONORALIAS +",  search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc\"), on=\"id=a_i\", batchSize=\"2\", fl=\"subject\")");
-    StreamContext context = new StreamContext();
-    context.setSolrClientCache(solrClientCache);
-    stream.setStreamContext(context);
-    tuples = getTuples(stream);
+    assertTrue(bucket.equals("hello4"));
+    assertTrue(sumi.longValue() == 15);
+    assertTrue(sumf.doubleValue() == 11.0D);
+    assertTrue(mini.doubleValue() == 4.0D);
+    assertTrue(minf.doubleValue() == 4.0D);
+    assertTrue(maxi.doubleValue() == 11.0D);
+    assertTrue(maxf.doubleValue() == 7.0D);
+    assertTrue(avgi.doubleValue() == 7.5D);
+    assertTrue(avgf.doubleValue() == 5.5D);
+    assertTrue(count.doubleValue() == 2);
 
-    assert(tuples.size() == 10);
-    Tuple t = tuples.get(0);
-    assertTrue("blah blah blah 0".equals(t.getString("subject")));
-    t = tuples.get(1);
-    assertTrue("blah blah blah 2".equals(t.getString("subject")));
-    t = tuples.get(2);
-    assertTrue("blah blah blah 3".equals(t.getString("subject")));
-    t = tuples.get(3);
-    assertTrue("blah blah blah 4".equals(t.getString("subject")));
-    t = tuples.get(4);
-    assertTrue("blah blah blah 1".equals(t.getString("subject")));
-    t = tuples.get(5);
-    assertTrue("blah blah blah 5".equals(t.getString("subject")));
-    t = tuples.get(6);
-    assertTrue("blah blah blah 6".equals(t.getString("subject")));
-    t = tuples.get(7);
-    assertTrue("blah blah blah 7".equals(t.getString("subject")));
-    t = tuples.get(8);
-    assertTrue("blah blah blah 8".equals(t.getString("subject")));
-    t = tuples.get(9);
-    assertTrue("blah blah blah 9".equals(t.getString("subject")));
-
-    //Change the batch size
-    stream = factory.constructStream("fetch(" + COLLECTIONORALIAS + ",  search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc\"), on=\"id=a_i\", batchSize=\"3\", fl=\"subject\")");
-    context = new StreamContext();
-    context.setSolrClientCache(solrClientCache);
-    stream.setStreamContext(context);
-    tuples = getTuples(stream);
 
-    assert(tuples.size() == 10);
-    t = tuples.get(0);
-    assertTrue("blah blah blah 0".equals(t.getString("subject")));
-    t = tuples.get(1);
-    assertTrue("blah blah blah 2".equals(t.getString("subject")));
-    t = tuples.get(2);
-    assertTrue("blah blah blah 3".equals(t.getString("subject")));
-    t = tuples.get(3);
-    assertTrue("blah blah blah 4".equals(t.getString("subject")));
-    t = tuples.get(4);
-    assertTrue("blah blah blah 1".equals(t.getString("subject")));
-    t = tuples.get(5);
-    assertTrue("blah blah blah 5".equals(t.getString("subject")));
-    t = tuples.get(6);
-    assertTrue("blah blah blah 6".equals(t.getString("subject")));
-    t = tuples.get(7);
-    assertTrue("blah blah blah 7".equals(t.getString("subject")));
-    t = tuples.get(8);
-    assertTrue("blah blah blah 8".equals(t.getString("subject")));
-    t = tuples.get(9);
-    assertTrue("blah blah blah 9".equals(t.getString("subject")));
-
-    // SOLR-10404 test that "hello 99" as a value gets escaped
-    new UpdateRequest()
-        .add(id, "99", "a1_s", "hello 99", "a2_s", "hello 99", "subject", "blah blah blah 99")
-        .commit(cluster.getSolrClient(), COLLECTIONORALIAS);
+    //Test index sort
+    clause = "facet("
+        +   "collection1, "
+        +   "q=\"*:*\", "
+        +   "fl=\"a_s,a_i,a_f\", "
+        +   "sort=\"a_s asc\", "
+        +   "buckets=\"a_s\", "
+        +   "bucketSorts=\"a_s desc\", "
+        +   "bucketSizeLimit=100, "
+        +   "sum(a_i), sum(a_f), "
+        +   "min(a_i), min(a_f), "
+        +   "max(a_i), max(a_f), "
+        +   "avg(a_i), avg(a_f), "
+        +   "count(*)"
+        + ")";
 
-    stream = factory.constructStream("fetch("+ COLLECTIONORALIAS +",  search(" + COLLECTIONORALIAS + ", q=" + id + ":99, fl=\"id,a1_s\", sort=\"id asc\"), on=\"a1_s=a2_s\", fl=\"subject\")");
-    context = new StreamContext();
-    context.setSolrClientCache(solrClientCache);
-    stream.setStreamContext(context);
+    stream = factory.constructStream(clause);
     tuples = getTuples(stream);
 
-    assertEquals(1, tuples.size());
-    t = tuples.get(0);
-    assertTrue("blah blah blah 99".equals(t.getString("subject")));
-
-    solrClientCache.close();
-  }
+    assert(tuples.size() == 3);
 
-  @Test
-  public void testParallelFetchStream() throws Exception {
 
-    new UpdateRequest()
-        .add(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "1", "subject", "blah blah blah 0")
-        .add(id, "2", "a_s", "hello0", "a_i", "2", "a_f", "2", "subject", "blah blah blah 2")
-        .add(id, "3", "a_s", "hello3", "a_i", "3", "a_f", "3", "subject", "blah blah blah 3")
-        .add(id, "4", "a_s", "hello4", "a_i", "4", "a_f", "4", "subject", "blah blah blah 4")
-        .add(id, "1", "a_s", "hello0", "a_i", "1", "a_f", "5", "subject", "blah blah blah 1")
-        .add(id, "5", "a_s", "hello3", "a_i", "5", "a_f", "6", "subject", "blah blah blah 5")
-        .add(id, "6", "a_s", "hello4", "a_i", "6", "a_f", "7", "subject", "blah blah blah 6")
-        .add(id, "7", "a_s", "hello3", "a_i", "7", "a_f", "8", "subject", "blah blah blah 7")
-        .add(id, "8", "a_s", "hello3", "a_i", "8", "a_f", "9", "subject", "blah blah blah 8")
-        .add(id, "9", "a_s", "hello0", "a_i", "9", "a_f", "10", "subject", "blah blah blah 9")
-        .commit(cluster.getSolrClient(), COLLECTIONORALIAS);
+    tuple = tuples.get(0);
+    bucket = tuple.getString("a_s");
+    sumi = tuple.getDouble("sum(a_i)");
+    sumf = tuple.getDouble("sum(a_f)");
+    mini = tuple.getDouble("min(a_i)");
+    minf = tuple.getDouble("min(a_f)");
+    maxi = tuple.getDouble("max(a_i)");
+    maxf = tuple.getDouble("max(a_f)");
+    avgi = tuple.getDouble("avg(a_i)");
+    avgf = tuple.getDouble("avg(a_f)");
+    count = tuple.getDouble("count(*)");
 
-    StreamContext streamContext = new StreamContext();
-    SolrClientCache solrClientCache = new SolrClientCache();
-    streamContext.setSolrClientCache(solrClientCache);
 
-    TupleStream stream;
-    List<Tuple> tuples;
+    assertTrue(bucket.equals("hello4"));
+    assertTrue(sumi.longValue() == 15);
+    assertTrue(sumf.doubleValue() == 11.0D);
+    assertTrue(mini.doubleValue() == 4.0D);
+    assertTrue(minf.doubleValue() == 4.0D);
+    assertTrue(maxi.doubleValue() == 11.0D);
+    assertTrue(maxf.doubleValue() == 7.0D);
+    assertTrue(avgi.doubleValue() == 7.5D);
+    assertTrue(avgf.doubleValue() == 5.5D);
+    assertTrue(count.doubleValue() == 2);
 
-    StreamFactory factory = new StreamFactory()
-        .withCollectionZkHost(COLLECTIONORALIAS, cluster.getZkServer().getZkAddress())
-        .withFunctionName("search", CloudSolrStream.class)
-        .withFunctionName("parallel", ParallelStream.class)
-        .withFunctionName("fetch", FetchStream.class);
 
-    try {
+    tuple = tuples.get(1);
+    bucket = tuple.getString("a_s");
+    sumi = tuple.getDouble("sum(a_i)");
+    sumf = tuple.getDouble("sum(a_f)");
+    mini = tuple.getDouble("min(a_i)");
+    minf = tuple.getDouble("min(a_f)");
+    maxi = tuple.getDouble("max(a_i)");
+    maxf = tuple.getDouble("max(a_f)");
+    avgi = tuple.getDouble("avg(a_i)");
+    avgf = tuple.getDouble("avg(a_f)");
+    count = tuple.getDouble("count(*)");
 
-      stream = factory.constructStream("parallel(" + COLLECTIONORALIAS + ", workers=2, sort=\"a_f asc\", fetch(" + COLLECTIONORALIAS + ",  search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc\", partitionKeys=\"id\"), on=\"id=a_i\", batchSize=\"2\", fl=\"subject\"))");
-      stream.setStreamContext(streamContext);
-      tuples = getTuples(stream);
+    assertTrue(bucket.equals("hello3"));
+    assertTrue(sumi.doubleValue() == 38.0D);
+    assertTrue(sumf.doubleValue() == 26.0D);
+    assertTrue(mini.doubleValue() == 3.0D);
+    assertTrue(minf.doubleValue() == 3.0D);
+    assertTrue(maxi.doubleValue() == 13.0D);
+    assertTrue(maxf.doubleValue() == 9.0D);
+    assertTrue(avgi.doubleValue() == 9.5D);
+    assertTrue(avgf.doubleValue() == 6.5D);
+    assertTrue(count.doubleValue() == 4);
 
-      assert (tuples.size() == 10);
-      Tuple t = tuples.get(0);
-      assertTrue("blah blah blah 0".equals(t.getString("subject")));
-      t = tuples.get(1);
-      assertTrue("blah blah blah 2".equals(t.getString("subject")));
-      t = tuples.get(2);
-      assertTrue("blah blah blah 3".equals(t.getString("subject")));
-      t = tuples.get(3);
-      assertTrue("blah blah blah 4".equals(t.getString("subject")));
-      t = tuples.get(4);
-      assertTrue("blah blah blah 1".equals(t.getString("subject")));
-      t = tuples.get(5);
-      assertTrue("blah blah blah 5".equals(t.getString("subject")));
-      t = tuples.get(6);
-      assertTrue("blah blah blah 6".equals(t.getString("subject")));
-      t = tuples.get(7);
-      assertTrue("blah blah blah 7".equals(t.getString("subject")));
-      t = tuples.get(8);
-      assertTrue("blah blah blah 8".equals(t.getString("subject")));
-      t = tuples.get(9);
-      assertTrue("blah blah blah 9".equals(t.getString("subject")));
-
-
-      stream = factory.constructStream("parallel(" + COLLECTIONORALIAS + ", workers=2, sort=\"a_f asc\", fetch(" + COLLECTIONORALIAS + ",  search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc\", partitionKeys=\"id\"), on=\"id=a_i\", batchSize=\"3\", fl=\"subject\"))");
-      stream.setStreamContext(streamContext);
-      tuples = getTuples(stream);
 
-      assert (tuples.size() == 10);
-      t = tuples.get(0);
-      assertTrue("blah blah blah 0".equals(t.getString("subject")));
-      t = tuples.get(1);
-      assertTrue("blah blah blah 2".equals(t.getString("subject")));
-      t = tuples.get(2);
-      assertTrue("blah blah blah 3".equals(t.getString("subject")));
-      t = tuples.get(3);
-      assertTrue("blah blah blah 4".equals(t.getString("subject")));
-      t = tuples.get(4);
-      assertTrue("blah blah blah 1".equals(t.getString("subject")));
-      t = tuples.get(5);
-      assertTrue("blah blah blah 5".equals(t.getString("subject")));
-      t = tuples.get(6);
-      assertTrue("blah blah blah 6".equals(t.getString("subject")));
-      t = tuples.get(7);
-      assertTrue("blah blah blah 7".equals(t.getString("subject")));
-      t = tuples.get(8);
-      assertTrue("blah blah blah 8".equals(t.getString("subject")));
-      t = tuples.get(9);
-      assertTrue("blah blah blah 9".equals(t.getString("subject")));
-    } finally {
-      solrClientCache.close();
-    }
-  }
+    tuple = tuples.get(2);
+    bucket = tuple.getString("a_s");
+    sumi = tuple.getDouble("sum(a_i)");
+    sumf = tuple.getDouble("sum(a_f)");
+    mini = tuple.getDouble("min(a_i)");
+    minf = tuple.getDouble("min(a_f)");
+    maxi = tuple.getDouble("max(a_i)");
+    maxf = tuple.getDouble("max(a_f)");
+    avgi = tuple.getDouble("avg(a_i)");
+    avgf = tuple.getDouble("avg(a_f)");
+    count = tuple.getDouble("count(*)");
 
+    assertTrue(bucket.equals("hello0"));
+    assertTrue(sumi.doubleValue() == 17.0D);
+    assertTrue(sumf.doubleValue() == 18.0D);
+    assertTrue(mini.doubleValue() == 0.0D);
+    assertTrue(minf.doubleValue() == 1.0D);
+    assertTrue(maxi.doubleValue() == 14.0D);
+    assertTrue(maxf.doubleValue() == 10.0D);
+    assertTrue(avgi.doubleValue() == 4.25D);
+    assertTrue(avgf.doubleValue() == 4.5D);
+    assertTrue(count.doubleValue() == 4);
 
+    //Test index sort
 
+    clause = "facet("
+        +   "collection1, "
+        +   "q=\"*:*\", "
+        +   "fl=\"a_s,a_i,a_f\", "
+        +   "sort=\"a_s asc\", "
+        +   "buckets=\"a_s\", "
+        +   "bucketSorts=\"a_s asc\", "
+        +   "bucketSizeLimit=100, "
+        +   "sum(a_i), sum(a_f), "
+        +   "min(a_i), min(a_f), "
+        +   "max(a_i), max(a_f), "
+        +   "avg(a_i), avg(a_f), "
+        +   "count(*)"
+        + ")";
 
+    stream = factory.constructStream(clause);
+    tuples = getTuples(stream);
 
-  @Test
-  public void testDaemonStream() throws Exception {
+    assert(tuples.size() == 3);
 
-    new UpdateRequest()
-        .add(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "1")
-        .add(id, "2", "a_s", "hello0", "a_i", "2", "a_f", "2")
-        .add(id, "3", "a_s", "hello3", "a_i", "3", "a_f", "3")
-        .add(id, "4", "a_s", "hello4", "a_i", "4", "a_f", "4")
-        .add(id, "1", "a_s", "hello0", "a_i", "1", "a_f", "5")
-        .add(id, "5", "a_s", "hello3", "a_i", "10", "a_f", "6")
-        .add(id, "6", "a_s", "hello4", "a_i", "11", "a_f", "7")
-        .add(id, "7", "a_s", "hello3", "a_i", "12", "a_f", "8")
-        .add(id, "8", "a_s", "hello3", "a_i", "13", "a_f", "9")
-        .add(id, "9", "a_s", "hello0", "a_i", "14", "a_f", "10")
-        .commit(cluster.getSolrClient(), COLLECTIONORALIAS);
 
-    StreamFactory factory = new StreamFactory()
-        .withCollectionZkHost(COLLECTIONORALIAS, cluster.getZkServer().getZkAddress())
-        .withFunctionName("search", CloudSolrStream.class)
-        .withFunctionName("rollup", RollupStream.class)
-        .withFunctionName("sum", SumMetric.class)
-        .withFunctionName("min", MinMetric.class)
-        .withFunctionName("max", MaxMetric.class)
-        .withFunctionName("avg", MeanMetric.class)
-        .withFunctionName("count", CountMetric.class)
-        .withFunctionName("daemon", DaemonStream.class);
-
-    StreamExpression expression;
-    DaemonStream daemonStream;
-
-    expression = StreamExpressionParser.parse("daemon(rollup("
-        + "search(" + COLLECTIONORALIAS + ", q=\"*:*\", fl=\"a_i,a_s\", sort=\"a_s asc\"),"
-        + "over=\"a_s\","
-        + "sum(a_i)"
-        + "), id=\"test\", runInterval=\"1000\", queueSize=\"9\")");
-    daemonStream = (DaemonStream)factory.constructStream(expression);
-    StreamContext streamContext = new StreamContext();
-    SolrClientCache solrClientCache = new SolrClientCache();
-    streamContext.setSolrClientCache(solrClientCache);
-    daemonStream.setStreamContext(streamContext);
-    try {
-      //Test Long and Double Sums
-
-      daemonStream.open(); // This will start the daemon thread
-
-      for (int i = 0; i < 4; i++) {
-        Tuple tuple = daemonStream.read(); // Reads from the queue
-        String bucket = tuple.getString("a_s");
-        Double sumi = tuple.getDouble("sum(a_i)");
-
-        //System.out.println("#################################### Bucket 1:"+bucket);
-        assertTrue(bucket.equals("hello0"));
-        assertTrue(sumi.doubleValue() == 17.0D);
-
-        tuple = daemonStream.read();
-        bucket = tuple.getString("a_s");
-        sumi = tuple.getDouble("sum(a_i)");
-
-        //System.out.println("#################################### Bucket 2:"+bucket);
-        assertTrue(bucket.equals("hello3"));
-        assertTrue(sumi.doubleValue() == 38.0D);
-
-        tuple = daemonStream.read();
-        bucket = tuple.getString("a_s");
-        sumi = tuple.getDouble("sum(a_i)");
-        //System.out.println("#################################### Bucket 3:"+bucket);
-        assertTrue(bucket.equals("hello4"));
-        assertTrue(sumi.longValue() == 15);
-      }
-
-      //Now lets wait until the internal queue fills up
-
-      while (daemonStream.remainingCapacity() > 0) {
-        try {
-          Thread.sleep(1000);
-        } catch (Exception e) {
+    tuple = tuples.get(0);
+    bucket = tuple.getString("a_s");
+    sumi = tuple.getDouble("sum(a_i)");
+    sumf = tuple.getDouble("sum(a_f)");
+    mini = tuple.getDouble("min(a_i)");
+    minf = tuple.getDouble("min(a_f)");
+    maxi = tuple.getDouble("max(a_i)");
+    maxf = tuple.getDouble("max(a_f)");
+    avgi = tuple.getDouble("avg(a_i)");
+    avgf = tuple.getDouble("avg(a_f)");
+    count = tuple.getDouble("count(*)");
 
-        }
-      }
+    assertTrue(bucket.equals("hello0"));
+    assertTrue(sumi.doubleValue() == 17.0D);
+    assertTrue(sumf.doubleValue() == 18.0D);
+    assertTrue(mini.doubleValue() == 0.0D);
+    assertTrue(minf.doubleValue() == 1.0D);
+    assertTrue(maxi.doubleValue() == 14.0D);
+    assertTrue(maxf.doubleValue() == 10.0D);
+    assertTrue(avgi.doubleValue() == 4.25D);
+    assertTrue(avgf.doubleValue() == 4.5D);
+    assertTrue(count.doubleValue() == 4);
 
-      //OK capacity is full, let's index a new doc
 
-      new UpdateRequest()
-          .add(id, "10", "a_s", "hello0", "a_i", "1", "a_f", "10")
-          .commit(cluster.getSolrClient(), COLLECTIONORALIAS);
+    tuple = tuples.get(1);
+    bucket = tuple.getString("a_s");
+    sumi = tuple.getDouble("sum(a_i)");
+    sumf = tuple.getDouble("sum(a_f)");
+    mini = tuple.getDouble("min(a_i)");
+    minf = tuple.getDouble("min(a_f)");
+    maxi = tuple.getDouble("max(a_i)");
+    maxf = tuple.getDouble("max(a_f)");
+    avgi = tuple.getDouble("avg(a_i)");
+    avgf = tuple.getDouble("avg(a_f)");
+    count = tuple.getDouble("count(*)");
 
-      //Now lets clear the existing docs in the queue 9, plus 3 more to get passed the run that was blocked. The next run should
-      //have the tuples with the updated count.
-      for (int i = 0; i < 12; i++) {
-        daemonStream.read();
-      }
+    assertTrue(bucket.equals("hello3"));
+    assertTrue(sumi.doubleValue() == 38.0D);
+    assertTrue(sumf.doubleValue() == 26.0D);
+    assertTrue(mini.doubleValue() == 3.0D);
+    assertTrue(minf.doubleValue() == 3.0D);
+    assertTrue(maxi.doubleValue() == 13.0D);
+    assertTrue(maxf.doubleValue() == 9.0D);
+    assertTrue(avgi.doubleValue() == 9.5D);
+    assertTrue(avgf.doubleValue() == 6.5D);
+    assertTrue(count.doubleValue() == 4);
 
-      //And rerun the loop. It should have a new count for hello0
-      for (int i = 0; i < 4; i++) {
-        Tuple tuple = daemonStream.read(); // Reads from the queue
-        String bucket = tuple.getString("a_s");
-        Double sumi = tuple.getDouble("sum(a_i)");
-
-        //System.out.println("#################################### Bucket 1:"+bucket);
-        assertTrue(bucket.equals("hello0"));
-        assertTrue(sumi.doubleValue() == 18.0D);
-
-        tuple = daemonStream.read();
-        bucket = tuple.getString("a_s");
-        sumi = tuple.getDouble("sum(a_i)");
-
-        //System.out.println("#################################### Bucket 2:"+bucket);
-        assertTrue(bucket.equals("hello3"));
-        assertTrue(sumi.doubleValue() == 38.0D);
-
-        tuple = daemonStream.read();
-        bucket = tuple.getString("a_s");
-        sumi = tuple.getDouble("sum(a_i)");
-        //System.out.println("#################################### Bucket 3:"+bucket);
-        assertTrue(bucket.equals("hello4"));
-        assertTrue(sumi.longValue() == 15);
-      }
-    } finally {
-      daemonStream.close(); //This should stop the daemon thread
-      solrClientCache.close();
-    }
-  }
 
+    tuple = tuples.get(2);
+    bucket = tuple.getString("a_s");
+    sumi = tuple.getDouble("sum(a_i)");
+    sumf = tuple.getDouble("sum(a_f)");
+    mini = tuple.getDouble("min(a_i)");
+    minf = tuple.getDouble("min(a_f)");
+    maxi = tuple.getDouble("max(a_i)");
+    maxf = tuple.getDouble("max(a_f)");
+    avgi = tuple.getDouble("avg(a_i)");
+    avgf = tuple.getDouble("avg(a_f)");
+    count = tuple.getDouble("count(*)");
 
-  @Test
-  public void testTerminatingDaemonStream() throws Exception {
-    Assume.assumeTrue(!useAlias);
+    assertTrue(bucket.equals("hello4"));
+    assertTrue(sumi.longValue() == 15);
+    assertTrue(sumf.doubleValue() == 11.0D);
+    assertTrue(mini.doubleValue() == 4.0D);
+    assertTrue(minf.doubleValue() == 4.0D);
+    assertTrue(maxi.doubleValue() == 11.0D);
+    assertTrue(maxf.doubleValue() == 7.0D);
+    assertTrue(avgi.doubleValue() == 7.5D);
+    assertTrue(avgf.doubleValue() == 5.5D);
+    assertTrue(count.doubleValue() == 2);
 
-    new UpdateRequest()
-        .add(id, "0", "a_s", "hello", "a_i", "0", "a_f", "1")
-        .add(id, "2", "a_s", "hello", "a_i", "2", "a_f", "2")
-        .add(id, "3", "a_s", "hello", "a_i", "3", "a_f", "3")
-        .add(id, "4", "a_s", "hello", "a_i", "4", "a_f", "4")
-        .add(id, "1", "a_s", "hello", "a_i", "1", "a_f", "5")
-        .add(id, "5", "a_s", "hello", "a_i", "10", "a_f", "6")
-        .add(id, "6", "a_s", "hello", "a_i", "11", "a_f", "7")
-        .add(id, "7", "a_s", "hello", "a_i", "12", "a_f", "8")
-        .add(id, "8", "a_s", "hello", "a_i", "13", "a_f", "9")
-        .add(id, "9", "a_s", "hello", "a_i", "14", "a_f", "10")
-        .commit(cluster.getSolrClient(), COLLECTIONORALIAS);
+    //Test zero result facets
+    clause = "facet("
+        +   "collection1, "
+        +   "q=\"blahhh\", "
+        +   "fl=\"a_s,a_i,a_f\", "
+        +   "sort=\"a_s asc\", "
+        +   "buckets=\"a_s\", "
+        +   "bucketSorts=\"a_s asc\", "
+        +   "bucketSizeLimit=100, "
+        +   "sum(a_i), sum(a_f), "
+        +   "min(a_i), min(a_f), "
+        +   "max(a_i), max(a_f), "
+        +   "avg(a_i), avg(a_f), "
+        +   "count(*)"
+        + ")";
 
-    StreamFactory factory = new StreamFactory()
-        .withCollectionZkHost(COLLECTIONORALIAS, cluster.getZkServer().getZkAddress())
-        .withFunctionName("topic", TopicStream.class)
-        .withFunctionName("daemon", DaemonStream.class);
+    stream = factory.constructStream(clause);
+    tuples = getTuples(stream);
 
-    StreamExpression expression;
-    DaemonStream daemonStream;
+    assert(tuples.size() == 0);
 
-    SolrClientCache cache = new SolrClientCache();
-    StreamContext context = new StreamContext();
-    context.setSolrClientCache(cache);
-    expression = StreamExpressionParser.parse("daemon(topic("+ COLLECTIONORALIAS +","+ COLLECTIONORALIAS +", q=\"a_s:hello\", initialCheckpoint=0, id=\"topic1\", rows=2, fl=\"id\""
-        + "), id=test, runInterval=1000, terminate=true, queueSize=50)");
-    daemonStream = (DaemonStream)factory.constructStream(expression);
-    daemonStream.setStreamContext(context);
-
-    List<Tuple> tuples = getTuples(daemonStream);
-    assertTrue(tuples.size() == 10);
-    cache.close();
   }
 
-
   @Test
-  public void testRollupStream() throws Exception {
+  public void testSubFacetStream() throws Exception {
 
     new UpdateRequest()
-        .add(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "1")
-        .add(id, "2", "a_s", "hello0", "a_i", "2", "a_f", "2")
-        .add(id, "3", "a_s", "hello3", "a_i", "3", "a_f", "3")
-        .add(id, "4", "a_s", "hello4", "a_i", "4", "a_f", "4")
-        .add(id, "1", "a_s", "hello0", "a_i", "1", "a_f", "5")
-        .add(id, "5", "a_s", "hello3", "a_i", "10", "a_f", "6")
-        .add(id, "6", "a_s", "hello4", "a_i", "11", "a_f", "7")
-        .add(id, "7", "a_s", "hello3", "a_i", "12", "a_f", "8")
-        .add(id, "8", "a_s", "hello3", "a_i", "13", "a_f", "9")
-        .add(id, "9", "a_s", "hello0", "a_i", "14", "a_f", "10")
+        .add(id, "0", "level1_s", "hello0", "level2_s", "a", "a_i", "0", "a_f", "1")
+        .add(id, "2", "level1_s", "hello0", "level2_s", "a", "a_i", "2", "a_f", "2")
+        .add(id, "3", "level1_s", "hello3", "level2_s", "a", "a_i", "3", "a_f", "3")
+        .add(id, "4", "level1_s", "hello4", "level2_s", "a", "a_i", "4", "a_f", "4")
+        .add(id, "1", "level1_s", "hello0", "level2_s", "b", "a_i", "1", "a_f", "5")
+        .add(id, "5", "level1_s", "hello3", "level2_s", "b", "a_i", "10", "a_f", "6")
+        .add(id, "6", "level1_s", "hello4", "level2_s", "b", "a_i", "11", "a_f", "7")
+        .add(id, "7", "level1_s", "hello3", "level2_s", "b", "a_i", "12", "a_f", "8")
+        .add(id, "8", "level1_s", "hello3", "level2_s", "b", "a_i", "13", "a_f", "9")
+        .add(id, "9", "level1_s", "hello0", "level2_s", "b", "a_i", "14", "a_f", "10")
         .commit(cluster.getSolrClient(), COLLECTIONORALIAS);
 
+    String clause;
+    TupleStream stream;
+    List<Tuple> tuples;
+    
     StreamFactory factory = new StreamFactory()
-      .withCollectionZkHost(COLLECTIONORALIAS, cluster.getZkServer().getZkAddress())
-      .withFunctionName("search", CloudSolrStream.class)
-      .withFunctionName("rollup", RollupStream.class)
+      .withCollectionZkHost("collection1", cluster.getZkServer().getZkAddress())
+      .withFunctionName("facet", FacetStream.class)
       .withFunctionName("sum", SumMetric.class)
       .withFunctionName("min", MinMetric.class)
       .withFunctionName("max", MaxMetric.class)
       .withFunctionName("avg", MeanMetric.class)
-      .withFunctionName("count", CountMetric.class);     
+      .withFunctionName("count", CountMetric.class);
     
-    StreamExpression expression;
-    TupleStream stream;
-    List<Tuple> tuples;
-    StreamContext streamContext = new StreamContext();
-    SolrClientCache solrClientCache = new SolrClientCache();
-    streamContext.setSolrClientCache(solrClientCache);
-    try {
-      expression = StreamExpressionParser.parse("rollup("
-          + "search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"a_s,a_i,a_f\", sort=\"a_s asc\"),"
-          + "over=\"a_s\","
-          + "sum(a_i),"
-          + "sum(a_f),"
-          + "min(a_i),"
-          + "min(a_f),"
-          + "max(a_i),"
-          + "max(a_f),"
-          + "avg(a_i),"
-          + "avg(a_f),"
-          + "count(*),"
-          + ")");
-      stream = factory.constructStream(expression);
-      stream.setStreamContext(streamContext);
-      tuples = getTuples(stream);
-
-      assert (tuples.size() == 3);
+    // Basic test
+    clause = "facet("
+              +   "collection1, "
+              +   "q=\"*:*\", "
+              +   "buckets=\"level1_s, level2_s\", "
+              +   "bucketSorts=\"sum(a_i) desc, sum(a_i) desc)\", "
+              +   "bucketSizeLimit=100, "
+              +   "sum(a_i), count(*)"
+              + ")";
+    
+    stream = factory.constructStream(clause);
+    tuples = getTuples(stream);
 
-      //Test Long and Double Sums
+    assert(tuples.size() == 6);
 
-      Tuple tuple = tuples.get(0);
-      String bucket = tuple.getString("a_s");
-      Double sumi = tuple.getDouble("sum(a_i)");
-      Double sumf = tuple.getDouble("sum(a_f)");
-      Double mini = tuple.getDouble("min(a_i)");
-      Double minf = tuple.getDouble("min(a_f)");
-      Double maxi = tuple.getDouble("max(a_i)");
-      Double maxf = tuple.getDouble("max(a_f)");
-      Double avgi = tuple.getDouble("avg(a_i)");
-      Double avgf = tuple.getDouble("avg(a_f)");
-      Double count = tuple.getDouble("count(*)");
+    Tuple tuple = tuples.get(0);
+    String bucket1 = tuple.getString("level1_s");
+    String bucket2 = tuple.getString("level2_s");
+    Double sumi = tuple.getDouble("sum(a_i)");
+    Double count = tuple.getDouble("count(*)");
 
-      assertTrue(bucket.equals("hello0"));
-      assertTrue(sumi.doubleValue() == 17.0D);
-      assertTrue(sumf.doubleValue() == 18.0D);
-      assertTrue(mini.doubleValue() == 0.0D);
-      assertTrue(minf.doubleValue() == 1.0D);
-      assertTrue(maxi.doubleValue() == 14.0D);
-      assertTrue(maxf.doubleValue() == 10.0D);
-      assertTrue(avgi.doubleValue() == 4.25D);
-      assertTrue(avgf.doubleValue() == 4.5D);
-      assertTrue(count.doubleValue() == 4);
+    assertTrue(bucket1.equals("hello3"));
+    assertTrue(bucket2.equals("b"));
+    assertTrue(sumi.longValue() == 35);
+    assertTrue(count.doubleValue() == 3);
 
-      tuple = tuples.get(1);
-      bucket = tuple.getString("a_s");
-      sumi = tuple.getDouble("sum(a_i)");
-      sumf = tuple.getDouble("sum(a_f)");
-      mini = tuple.getDouble("min(a_i)");
-      minf = tuple.getDouble("min(a_f)");
-      maxi = tuple.getDouble("max(a_i)");
-      maxf = tuple.getDouble("max(a_f)");
-      avgi = tuple.getDouble("avg(a_i)");
-      avgf = tuple.getDouble("avg(a_f)");
-      count = tuple.getDouble("count(*)");
+    tuple = tuples.get(1);
+    bucket1 = tuple.getString("level1_s");
+    bucket2 = tuple.getString("level2_s");
+    sumi = tuple.getDouble("sum(a_i)");
+    count = tuple.getDouble("count(*)");
 
-      assertTrue(bucket.equals("hello3"));
-      assertTrue(sumi.doubleValue() == 38.0D);
-      assertTrue(sumf.doubleValue() == 26.0D);
-      assertTrue(mini.doubleValue() == 3.0D);
-      assertTrue(minf.doubleValue() == 3.0D);
-      assertTrue(maxi.doubleValue() == 13.0D);
-      assertTrue(maxf.doubleValue() == 9.0D);
-      assertTrue(avgi.doubleValue() == 9.5D);
-      assertTrue(avgf.doubleValue() == 6.5D);
-      assertTrue(count.doubleValue() == 4);
-
-      tuple = tuples.get(2);
-      bucket = tuple.getString("a_s");
-      sumi = tuple.getDouble("sum(a_i)");
-      sumf = tuple.getDouble("sum(a_f)");
-      mini = tuple.getDouble("min(a_i)");
-      minf = tuple.getDouble("min(a_f)");
-      maxi = tuple.getDouble("max(a_i)");
-      maxf = tuple.getDouble("max(a_f)");
-      avgi = tuple.getDouble("avg(a_i)");
-      avgf = tuple.getDouble("avg(a_f)");
-      count = tuple.getDouble("count(*)");
+    assertTrue(bucket1.equals("hello0"));
+    assertTrue(bucket2.equals("b"));
+    assertTrue(sumi.longValue() == 15);
+    assertTrue(count.doubleValue() == 2);
 
-      assertTrue(bucket.equals("hello4"));
-      assertTrue(sumi.longValue() == 15);
-      assertTrue(sumf.doubleValue() == 11.0D);
-      assertTrue(mini.doubleValue() == 4.0D);
-      assertTrue(minf.doubleValue() == 4.0D);
-      assertTrue(maxi.doubleValue() == 11.0D);
-      assertTrue(maxf.doubleValue() == 7.0D);
-      assertTrue(avgi.doubleValue() == 7.5D);
-      assertTrue(avgf.doubleValue() == 5.5D);
-      assertTrue(count.doubleValue() == 2);
+    tuple = tuples.get(2);
+    bucket1 = tuple.getString("level1_s");
+    bucket2 = tuple.getString("level2_s");
+    sumi = tuple.getDouble("sum(a_i)");
+    count = tuple.getDouble("count(*)");
 
-    } finally {
-      solrClientCache.close();
-    }
-  }
+    assertTrue(bucket1.equals("hello4"));
+    assertTrue(bucket2.equals("b"));
+    assertTrue(sumi.longValue() == 11);
+    assertTrue(count.doubleValue() == 1);
 
-  @Test
-  public void testStatsStream() throws Exception {
+    tuple = tuples.get(3);
+    bucket1 = tuple.getString("level1_s");
+    bucket2 = tuple.getString("level2_s");
+    sumi = tuple.getDouble("sum(a_i)");
+    count = tuple.getDouble("count(*)");
 
-    new UpdateRequest()
-        .add(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "1")
-        .add(id, "2", "a_s", "hello0", "a_i", "2", "a_f", "2")
-        .add(id, "3", "a_s", "hello3", "a_i", "3", "a_f", "3")
-        .add(id, "4", "a_s", "hello4", "a_i", "4", "a_f", "4")
-        .add(id, "1", "a_s", "hello0", "a_i", "1", "a_f", "5")
-        .add(id, "5", "a_s", "hello3", "a_i", "10", "a_f", "6")
-        .add(id, "6", "a_s", "hello4", "a_i", "11", "a_f", "7")
-        .add(id, "7", "a_s", "hello3", "a_i", "12", "a_f", "8")
-        .add(id, "8", "a_s", "hello3", "a_i", "13", "a_f", "9")
-        .add(id, "9", "a_s", "hello0", "a_i", "14", "a_f", "10")
-        .commit(cluster.getSolrClient(), COLLECTIONORALIAS);
-
-    StreamFactory factory = new StreamFactory()
-    .withCollectionZkHost(COLLECTIONORALIAS, cluster.getZkServer().getZkAddress())
-    .withFunctionName("stats", StatsStream.class)
-    .withFunctionName("sum", SumMetric.class)
-    .withFunctionName("min", MinMetric.class)
-    .withFunctionName("max", MaxMetric.class)
-    .withFunctionName("avg", MeanMetric.class)
-    .withFunctionName("count", CountMetric.class);     
-  
-    StreamExpression expression;
-    TupleStream stream;
-    List<Tuple> tuples;
-    StreamContext streamContext = new StreamContext();
-    SolrClientCache cache = new SolrClientCache();
-    try {
-      streamContext.setSolrClientCache(cache);
-      String expr = "stats(" + COLLECTIONORALIAS + ", q=*:*, sum(a_i), sum(a_f), min(a_i), min(a_f), max(a_i), max(a_f), avg(a_i), avg(a_f), count(*))";
-      expression = StreamExpressionParser.parse(expr);
-      stream = factory.constructStream(expression);
-      stream.setStreamContext(streamContext);
-
-      tuples = getTuples(stream);
-
-      assert (tuples.size() == 1);
-
-      //Test Long and Double Sums
-
-      Tuple tuple = tuples.get(0);
-
-      Double sumi = tuple.getDouble("sum(a_i)");
-      Double sumf = tuple.getDouble("sum(a_f)");
-      Double mini = tuple.getDouble("min(a_i)");
-      Double minf = tuple.getDouble("min(a_f)");
-      Double maxi = tuple.getDouble("max(a_i)");
-      Double maxf = tuple.getDouble("max(a_f)");
-      Double avgi = tuple.getDouble("avg(a_i)");
-      Double avgf = tuple.getDouble("avg(a_f)");
-      Double count = tuple.getDouble("count(*)");
-
-      assertTrue(sumi.longValue() == 70);
-      assertTrue(sumf.doubleValue() == 55.0D);
-      assertTrue(mini.doubleValue() == 0.0D);
-      assertTrue(minf.doubleValue() == 1.0D);
-      assertTrue(maxi.doubleValue() == 14.0D);
-      assertTrue(maxf.doubleValue() == 10.0D);
-      assertTrue(avgi.doubleValue() == 7.0D);
-      assertTrue(avgf.doubleValue() == 5.5D);
-      assertTrue(count.doubleValue() == 10);
-
-
-      //Test with shards parameter
-      List<String> shardUrls = TupleStream.getShards(cluster.getZkServer().getZkAddress(), COLLECTIONORALIAS, streamContext);
-      expr = "stats(myCollection, q=*:*, sum(a_i), sum(a_f), min(a_i), min(a_f), max(a_i), max(a_f), avg(a_i), avg(a_f), count(*))";
-      Map<String, List<String>> shardsMap = new HashMap();
-      shardsMap.put("myCollection", shardUrls);
-      StreamContext context = new StreamContext();
-      context.put("shards", shardsMap);
-      context.setSolrClientCache(cache);
-      stream = factory.constructStream(expr);
-      stream.setStreamContext(context);
-
-      tuples = getTuples(stream);
-
-      assert (tuples.size() == 1);
-
-      //Test Long and Double Sums
-
-      tuple = tuples.get(0);
-
-      sumi = tuple.getDouble("sum(a_i)");
-      sumf = tuple.getDouble("sum(a_f)");
-      mini = tuple.getDouble("min(a_i)");
-      minf = tuple.getDouble("min(a_f)");
-      maxi = tuple.getDouble("max(a_i)");
-      maxf = tuple.getDouble("max(a_f)");
-      avgi = tuple.getDouble("avg(a_i)");
-      avgf = tuple.getDouble("avg(a_f)");
-      count = tuple.getDouble("count(*)");
-
-      assertTrue(sumi.longValue() == 70);
-      assertTrue(sumf.doubleValue() == 55.0D);
-      assertTrue(mini.doubleValue() == 0.0D);
-      assertTrue(minf.doubleValue() == 1.0D);
-      assertTrue(maxi.doubleValue() == 14.0D);
-      assertTrue(maxf.doubleValue() == 10.0D);
-      assertTrue(avgi.doubleValue() == 7.0D);
-      assertTrue(avgf.doubleValue() == 5.5D);
-      assertTrue(count.doubleValue() == 10);
-
-      //Execersise the /stream hander
-
-      //Add the shards http parameter for the myCollection
-      StringBuilder buf = new StringBuilder();
-      for (String shardUrl : shardUrls) {
-        if (buf.length() > 0) {
-          buf.append(",");
-        }
-        buf.append(shardUrl);
-      }
-
-      ModifiableSolrParams solrParams = new ModifiableSolrParams();
-      solrParams.add("qt", "/stream");
-      solrParams.add("expr", expr);
-      solrParams.add("myCollection.shards", buf.toString());
-      SolrStream solrStream = new SolrStream(shardUrls.get(0), solrParams);
-      tuples = getTuples(solrStream);
-      assert (tuples.size() == 1);
-
-      tuple =tuples.get(0);
-
-      sumi = tuple.getDouble("sum(a_i)");
-      sumf = tuple.getDouble("sum(a_f)");
-      mini = tuple.getDouble("min(a_i)");
-      minf = tuple.getDouble("min(a_f)");
-      maxi = tuple.getDouble("max(a_i)");
-      maxf = tuple.getDouble("max(a_f)");
-      avgi = tuple.getDouble("avg(a_i)");
-      avgf = tuple.getDouble("avg(a_f)");
-      count = tuple.getDouble("count(*)");
-
-      assertTrue(sumi.longValue() == 70);
-      assertTrue(sumf.doubleValue() == 55.0D);
-      assertTrue(mini.doubleValue() == 0.0D);
-      assertTrue(minf.doubleValue() == 1.0D);
-      assertTrue(maxi.doubleValue() == 14.0D);
-      assertTrue(maxf.doubleValue() == 10.0D);
-      assertTrue(avgi.doubleValue() == 7.0D);
-      assertTrue(avgf.doubleValue() == 5.5D);
-      assertTrue(count.doubleValue() == 10);
-      //Add a negative test to prove that it cannot find slices if shards parameter is removed
-
-      try {
-        ModifiableSolrParams solrParamsBad = new ModifiableSolrParams();
-        solrParamsBad.add("qt", "/stream");
-        solrParamsBad.add("expr", expr);
-        solrStream = new SolrStream(shardUrls.get(0), solrParamsBad);
-        tuples = getTuples(solrStream);
-        throw new Exception("Exception should have been thrown above");
-      } catch (IOException e) {
-        assertTrue(e.getMessage().contains("Collection not found: myCollection"));
-      }
-    } finally {
-      cache.close();
-    }
-  }
-
-  @Test
-  public void testParallelUniqueStream() throws Exception {
-
-    new UpdateRequest()
-        .add(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "0")
-        .add(id, "2", "a_s", "hello2", "a_i", "2", "a_f", "0")
-        .add(id, "3", "a_s", "hello3", "a_i", "3", "a_f", "3")
-        .add(id, "4", "a_s", "hello4", "a_i", "4", "a_f", "4")
-        .add(id, "1", "a_s", "hello1", "a_i", "1", "a_f", "1")
-        .add(id, "5", "a_s", "hello1", "a_i", "10", "a_f", "1")
-        .add(id, "6", "a_s", "hello1", "a_i", "11", "a_f", "5")
-        .add(id, "7", "a_s", "hello1", "a_i", "12", "a_f", "5")
-        .add(id, "8", "a_s", "hello1", "a_i", "13", "a_f", "4")
-        .commit(cluster.getSolrClient(), COLLECTIONORALIAS);
-
-    String zkHost = cluster.getZkServer().getZkAddress();
-    StreamFactory streamFactory = new StreamFactory().withCollectionZkHost(COLLECTIONORALIAS, zkHost)
-        .withFunctionName("search", CloudSolrStream.class)
-        .withFunctionName("unique", UniqueStream.class)
-        .withFunctionName("top", RankStream.class)
-        .withFunctionName("group", ReducerStream.class)
-        .withFunctionName("parallel", ParallelStream.class);
-    StreamContext streamContext = new StreamContext();
-    SolrClientCache solrClientCache = new SolrClientCache();
-    streamContext.setSolrClientCache(solrClientCache);
-
-
-
-    try {
-
-      ParallelStream pstream = (ParallelStream) streamFactory.constructStream("parallel(" + COLLECTIONORALIAS + ", unique(search(collection1, q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_i asc\", partitionKeys=\"a_f\"), over=\"a_f\"), workers=\"2\", zkHost=\"" + zkHost + "\", sort=\"a_f asc\")");
-      pstream.setStreamContext(streamContext);
-      List<Tuple> tuples = getTuples(pstream);
-      assert (tuples.size() == 5);
-      assertOrder(tuples, 0, 1, 3, 4, 6);
-
-      //Test the eofTuples
-
-      Map<String, Tuple> eofTuples = pstream.getEofTuples();
-      assert (eofTuples.size() == 2); //There should be an EOF tuple for each worker.
-    } finally {
-      solrClientCache.close();
-    }
-  }
-
-  @Test
-  public void testParallelShuffleStream() throws Exception {
-
-    new UpdateRequest()
-        .add(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "0")
-        .add(id, "2", "a_s", "hello2", "a_i", "2", "a_f", "0")
-        .add(id, "3", "a_s", "hello3", "a_i", "3", "a_f", "3")
-        .add(id, "4", "a_s", "hello4", "a_i", "4", "a_f", "4")
-        .add(id, "1", "a_s", "hello1", "a_i", "1", "a_f", "1")
-        .add(id, "5", "a_s", "hello1", "a_i", "10", "a_f", "1")
-        .add(id, "6", "a_s", "hello1", "a_i", "11", "a_f", "5")
-        .add(id, "7", "a_s", "hello1", "a_i", "12", "a_f", "5")
-        .add(id, "8", "a_s", "hello1", "a_i", "13", "a_f", "4")
-        .add(id, "9", "a_s", "hello1", "a_i", "13", "a_f", "4")
-        .add(id, "10", "a_s", "hello1", "a_i", "13", "a_f", "4")
-        .add(id, "11", "a_s", "hello1", "a_i", "13", "a_f", "4")
-        .add(id, "12", "a_s", "hello1", "a_i", "13", "a_f", "4")
-        .add(id, "13", "a_s", "hello1", "a_i", "13", "a_f", "4")
-        .add(id, "14", "a_s", "hello1", "a_i", "13", "a_f", "4")
-        .add(id, "15", "a_s", "hello1", "a_i", "13", "a_f", "4")
-        .add(id, "16", "a_s", "hello1", "a_i", "13", "a_f", "4")
-        .add(id, "17", "a_s", "hello1", "a_i", "13", "a_f", "4")
-        .add(id, "18", "a_s", "hello1", "a_i", "13", "a_f", "4")
-        .add(id, "19", "a_s", "hello1", "a_i", "13", "a_f", "4")
-        .add(id, "20", "a_s", "hello1", "a_i", "13", "a_f", "4")
-        .add(id, "21", "a_s", "hello1", "a_i", "13", "a_f", "4")
-        .add(id, "22", "a_s", "hello1", "a_i", "13", "a_f", "4")
-        .add(id, "23", "a_s", "hello1", "a_i", "13", "a_f", "4")
-        .add(id, "24", "a_s", "hello1", "a_i", "13", "a_f", "4")
-        .add(id, "25", "a_s", "hello1", "a_i", "13", "a_f", "4")
-        .add(id, "26", "a_s", "hello1", "a_i", "13", "a_f", "4")
-        .add(id, "27", "a_s", "hello1", "a_i", "13", "a_f", "4")
-        .add(id, "28", "a_s", "hello1", "a_i", "13", "a_f", "4")
-        .add(id, "29", "a_s", "hello1", "a_i", "13", "a_f", "4")
-        .add(id, "30", "a_s", "hello1", "a_i", "13", "a_f", "4")
-        .add(id, "31", "a_s", "hello1", "a_i", "13", "a_f", "4")
-        .add(id, "32", "a_s", "hello1", "a_i", "13", "a_f", "4")
-        .add(id, "33", "a_s", "hello1", "a_i", "13", "a_f", "4")
-        .add(id, "34", "a_s", "hello1", "a_i", "13", "a_f", "4")
-        .add(id, "35", "a_s", "hello1", "a_i", "13", "a_f", "4")
-        .add(id, "36", "a_s", "hello1", "a_i", "13", "a_f", "4")
-        .add(id, "37", "a_s", "hello1", "a_i", "13", "a_f", "4")
-        .add(id, "38", "a_s", "hello1", "a_i", "13", "a_f", "4")
-        .add(id, "39", "a_s", "hello1", "a_i", "13", "a_f", "4")
-        .add(id, "40", "a_s", "hello1", "a_i", "13", "a_f", "4")
-        .add(id, "41", "a_s", "hello1", "a_i", "13", "a_f", "4")
-        .add(id, "42", "a_s", "hello1", "a_i", "13", "a_f", "4")
-        .add(id, "43", "a_s", "hello1", "a_i", "13", "a_f", "4")
-        .add(id, "44", "a_s", "hello1", "a_i", "13", "a_f", "4")
-        .add(id, "45", "a_s", "hello1", "a_i", "13", "a_f", "4")
-        .add(id, "46", "a_s", "hello1", "a_i", "13", "a_f", "4")
-        .add(id, "47", "a_s", "hello1", "a_i", "13", "a_f", "4")
-        .add(id, "48", "a_s", "hello1", "a_i", "13", "a_f", "4")
-        .add(id, "49", "a_s", "hello1", "a_i", "13", "a_f", "4")
-        .add(id, "50", "a_s", "hello1", "a_i", "13", "a_f", "4")
-        .add(id, "51", "a_s", "hello1", "a_i", "13", "a_f", "4")
-        .add(id, "52", "a_s", "hello1", "a_i", "13", "a_f", "4")
-        .add(id, "53", "a_s", "hello1", "a_i", "13", "a_f", "4")
-        .add(id, "54", "a_s", "hello1", "a_i", "13", "a_f", "4")
-        .add(id, "55", "a_s", "hello1", "a_i", "13", "a_f", "4")
-        .add(id, "56", "a_s", "hello1", "a_i", "13", "a_f", "1000")
-
-        .commit(cluster.getSolrClient(), COLLECTIONORALIAS);
-
-    StreamContext streamContext = new StreamContext();
-    SolrClientCache solrClientCache = new SolrClientCache();
-    streamContext.setSolrClientCache(solrClientCache);
-
-    String zkHost = cluster.getZkServer().getZkAddress();
-    StreamFactory streamFactory = new StreamFactory().withCollectionZkHost(COLLECTIONORALIAS, zkHost)
-        .withFunctionName("shuffle", ShuffleStream.class)
-        .withFunctionName("unique", UniqueStream.class)
-        .withFunctionName("parallel", ParallelStream.class);
-
-    try {
-      ParallelStream pstream = (ParallelStream) streamFactory.constructStream("parallel(" + COLLECTIONORALIAS + ", unique(shuffle(collection1, q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_i asc\", partitionKeys=\"a_f\"), over=\"a_f\"), workers=\"2\", zkHost=\"" + zkHost + "\", sort=\"a_f asc\")");
-      pstream.setStreamFactory(streamFactory);
-      pstream.setStreamContext(streamContext);
-     

<TRUNCATED>