You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ab...@apache.org on 2018/04/10 14:12:03 UTC

[24/50] lucene-solr:jira/solr-12181: SOLR-12183: Refactor Streaming Expression test cases

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/80375acb/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamDecoratorTest.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamDecoratorTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamDecoratorTest.java
new file mode 100644
index 0000000..2afc74f
--- /dev/null
+++ b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamDecoratorTest.java
@@ -0,0 +1,3954 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.client.solrj.io.stream;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+
+import org.apache.lucene.util.LuceneTestCase;
+import org.apache.lucene.util.LuceneTestCase.Slow;
+import org.apache.solr.client.solrj.embedded.JettySolrRunner;
+import org.apache.solr.client.solrj.io.SolrClientCache;
+import org.apache.solr.client.solrj.io.Tuple;
+import org.apache.solr.client.solrj.io.comp.ComparatorOrder;
+import org.apache.solr.client.solrj.io.comp.FieldComparator;
+import org.apache.solr.client.solrj.io.eval.AddEvaluator;
+import org.apache.solr.client.solrj.io.eval.AndEvaluator;
+import org.apache.solr.client.solrj.io.eval.EqualToEvaluator;
+import org.apache.solr.client.solrj.io.eval.GreaterThanEqualToEvaluator;
+import org.apache.solr.client.solrj.io.eval.GreaterThanEvaluator;
+import org.apache.solr.client.solrj.io.eval.IfThenElseEvaluator;
+import org.apache.solr.client.solrj.io.eval.LessThanEqualToEvaluator;
+import org.apache.solr.client.solrj.io.eval.LessThanEvaluator;
+import org.apache.solr.client.solrj.io.eval.NotEvaluator;
+import org.apache.solr.client.solrj.io.eval.OrEvaluator;
+import org.apache.solr.client.solrj.io.eval.RawValueEvaluator;
+import org.apache.solr.client.solrj.io.ops.ConcatOperation;
+import org.apache.solr.client.solrj.io.ops.GroupOperation;
+import org.apache.solr.client.solrj.io.ops.ReplaceOperation;
+import org.apache.solr.client.solrj.io.stream.expr.StreamExpression;
+import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionParser;
+import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
+import org.apache.solr.client.solrj.io.stream.metrics.CountMetric;
+import org.apache.solr.client.solrj.io.stream.metrics.MaxMetric;
+import org.apache.solr.client.solrj.io.stream.metrics.MeanMetric;
+import org.apache.solr.client.solrj.io.stream.metrics.MinMetric;
+import org.apache.solr.client.solrj.io.stream.metrics.SumMetric;
+import org.apache.solr.client.solrj.request.CollectionAdminRequest;
+import org.apache.solr.client.solrj.request.UpdateRequest;
+import org.apache.solr.cloud.AbstractDistribZkTestBase;
+import org.apache.solr.cloud.SolrCloudTestCase;
+import org.apache.solr.common.params.CommonParams;
+import org.apache.solr.common.params.ModifiableSolrParams;
+import org.junit.Assume;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+@Slow
+@LuceneTestCase.SuppressCodecs({"Lucene3x", "Lucene40","Lucene41","Lucene42","Lucene45"})
+public class StreamDecoratorTest extends SolrCloudTestCase {
+
+  private static final String COLLECTIONORALIAS = "collection1";
+  private static final int TIMEOUT = DEFAULT_TIMEOUT;
+  private static final String id = "id";
+
+  private static boolean useAlias;
+
+  @BeforeClass
+  public static void setupCluster() throws Exception {
+    configureCluster(4)
+        .addConfig("conf", getFile("solrj").toPath().resolve("solr").resolve("configsets").resolve("streaming").resolve("conf"))
+        .addConfig("ml", getFile("solrj").toPath().resolve("solr").resolve("configsets").resolve("ml").resolve("conf"))
+        .configure();
+
+    String collection;
+    useAlias = random().nextBoolean();
+    if (useAlias) {
+      collection = COLLECTIONORALIAS + "_collection";
+    } else {
+      collection = COLLECTIONORALIAS;
+    }
+
+    CollectionAdminRequest.createCollection(collection, "conf", 2, 1).process(cluster.getSolrClient());
+    AbstractDistribZkTestBase.waitForRecoveriesToFinish(collection, cluster.getSolrClient().getZkStateReader(),
+        false, true, TIMEOUT);
+    if (useAlias) {
+      CollectionAdminRequest.createAlias(COLLECTIONORALIAS, collection).process(cluster.getSolrClient());
+    }
+  }
+
+  @Before
+  public void cleanIndex() throws Exception {
+    new UpdateRequest()
+        .deleteByQuery("*:*")
+        .commit(cluster.getSolrClient(), COLLECTIONORALIAS);
+  }
+
+  @Test
+  public void testUniqueStream() throws Exception {
+
+    new UpdateRequest()
+        .add(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "0")
+        .add(id, "2", "a_s", "hello2", "a_i", "2", "a_f", "0")
+        .add(id, "3", "a_s", "hello3", "a_i", "3", "a_f", "3")
+        .add(id, "4", "a_s", "hello4", "a_i", "4", "a_f", "4")
+        .add(id, "1", "a_s", "hello1", "a_i", "1", "a_f", "1")
+        .commit(cluster.getSolrClient(), COLLECTIONORALIAS);
+
+    StreamExpression expression;
+    TupleStream stream;
+    List<Tuple> tuples;
+    StreamContext streamContext = new StreamContext();
+    SolrClientCache solrClientCache = new SolrClientCache();
+    streamContext.setSolrClientCache(solrClientCache);
+
+    StreamFactory factory = new StreamFactory()
+      .withCollectionZkHost(COLLECTIONORALIAS, cluster.getZkServer().getZkAddress())
+      .withFunctionName("search", CloudSolrStream.class)
+      .withFunctionName("unique", UniqueStream.class);
+
+    try {
+      // Basic test
+      expression = StreamExpressionParser.parse("unique(search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_i asc\"), over=\"a_f\")");
+      stream = new UniqueStream(expression, factory);
+      stream.setStreamContext(streamContext);
+      tuples = getTuples(stream);
+
+      assert (tuples.size() == 4);
+      assertOrder(tuples, 0, 1, 3, 4);
+
+      // Basic test desc
+      expression = StreamExpressionParser.parse("unique(search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f desc, a_i desc\"), over=\"a_f\")");
+      stream = new UniqueStream(expression, factory);
+      stream.setStreamContext(streamContext);
+      tuples = getTuples(stream);
+
+      assert (tuples.size() == 4);
+      assertOrder(tuples, 4, 3, 1, 2);
+
+      // Basic w/multi comp
+      expression = StreamExpressionParser.parse("unique(search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_i asc\"), over=\"a_f, a_i\")");
+      stream = new UniqueStream(expression, factory);
+      stream.setStreamContext(streamContext);
+      tuples = getTuples(stream);
+
+      assert (tuples.size() == 5);
+      assertOrder(tuples, 0, 2, 1, 3, 4);
+
+      // full factory w/multi comp
+      stream = factory.constructStream("unique(search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_i asc\"), over=\"a_f, a_i\")");
+      stream.setStreamContext(streamContext);
+      tuples = getTuples(stream);
+
+      assert (tuples.size() == 5);
+      assertOrder(tuples, 0, 2, 1, 3, 4);
+    } finally {
+      solrClientCache.close();
+    }
+  }
+
+  @Test
+  public void testSortStream() throws Exception {
+
+    new UpdateRequest()
+        .add(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "0")
+        .add(id, "2", "a_s", "hello2", "a_i", "2", "a_f", "0")
+        .add(id, "3", "a_s", "hello3", "a_i", "3", "a_f", "3")
+        .add(id, "4", "a_s", "hello4", "a_i", "4", "a_f", "4")
+        .add(id, "1", "a_s", "hello1", "a_i", "1", "a_f", "1")
+        .add(id, "5", "a_s", "hello1", "a_i", "1", "a_f", "2")
+        .commit(cluster.getSolrClient(), COLLECTIONORALIAS);
+
+    StreamExpression expression;
+    TupleStream stream;
+    List<Tuple> tuples;
+    StreamContext streamContext = new StreamContext();
+    SolrClientCache solrClientCache = new SolrClientCache();
+    streamContext.setSolrClientCache(solrClientCache);
+    try {
+      StreamFactory factory = new StreamFactory()
+          .withCollectionZkHost(COLLECTIONORALIAS, cluster.getZkServer().getZkAddress())
+          .withFunctionName("search", CloudSolrStream.class)
+          .withFunctionName("sort", SortStream.class);
+
+      // Basic test
+      stream = factory.constructStream("sort(search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc\"), by=\"a_i asc\")");
+      stream.setStreamContext(streamContext);
+      tuples = getTuples(stream);
+      assert (tuples.size() == 6);
+      assertOrder(tuples, 0, 1, 5, 2, 3, 4);
+
+      // Basic test desc
+      stream = factory.constructStream("sort(search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc\"), by=\"a_i desc\")");
+      stream.setStreamContext(streamContext);
+      tuples = getTuples(stream);
+      assert (tuples.size() == 6);
+      assertOrder(tuples, 4, 3, 2, 1, 5, 0);
+
+      // Basic w/multi comp
+      stream = factory.constructStream("sort(search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc\"), by=\"a_i asc, a_f desc\")");
+      stream.setStreamContext(streamContext);
+      tuples = getTuples(stream);
+      assert (tuples.size() == 6);
+      assertOrder(tuples, 0, 5, 1, 2, 3, 4);
+    } finally {
+      solrClientCache.close();
+    }
+  }
+
+  @Test
+  public void testNullStream() throws Exception {
+
+    new UpdateRequest()
+        .add(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "0")
+        .add(id, "2", "a_s", "hello2", "a_i", "2", "a_f", "0")
+        .add(id, "3", "a_s", "hello3", "a_i", "3", "a_f", "3")
+        .add(id, "4", "a_s", "hello4", "a_i", "4", "a_f", "4")
+        .add(id, "1", "a_s", "hello1", "a_i", "1", "a_f", "1")
+        .add(id, "5", "a_s", "hello1", "a_i", "1", "a_f", "2")
+        .commit(cluster.getSolrClient(), COLLECTIONORALIAS);
+
+    StreamExpression expression;
+    TupleStream stream;
+    List<Tuple> tuples;
+    StreamContext streamContext = new StreamContext();
+    SolrClientCache solrClientCache = new SolrClientCache();
+    streamContext.setSolrClientCache(solrClientCache);
+    StreamFactory factory = new StreamFactory()
+        .withCollectionZkHost(COLLECTIONORALIAS, cluster.getZkServer().getZkAddress())
+        .withFunctionName("search", CloudSolrStream.class)
+        .withFunctionName("null", NullStream.class);
+
+    try {
+      // Basic test
+      stream = factory.constructStream("null(search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc\"), by=\"a_i asc\")");
+      stream.setStreamContext(streamContext);
+      tuples = getTuples(stream);
+      assertTrue(tuples.size() == 1);
+      assertTrue(tuples.get(0).getLong("nullCount") == 6);
+    } finally {
+      solrClientCache.close();
+    }
+  }
+
+  @Test
+  public void testParallelNullStream() throws Exception {
+
+    new UpdateRequest()
+        .add(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "0")
+        .add(id, "2", "a_s", "hello2", "a_i", "2", "a_f", "0")
+        .add(id, "3", "a_s", "hello3", "a_i", "3", "a_f", "3")
+        .add(id, "4", "a_s", "hello4", "a_i", "4", "a_f", "4")
+        .add(id, "1", "a_s", "hello1", "a_i", "1", "a_f", "1")
+        .add(id, "5", "a_s", "hello1", "a_i", "1", "a_f", "2")
+        .commit(cluster.getSolrClient(), COLLECTIONORALIAS);
+
+    StreamExpression expression;
+    TupleStream stream;
+    List<Tuple> tuples;
+    StreamContext streamContext = new StreamContext();
+    SolrClientCache solrClientCache = new SolrClientCache();
+    streamContext.setSolrClientCache(solrClientCache);
+
+    StreamFactory factory = new StreamFactory()
+        .withCollectionZkHost(COLLECTIONORALIAS, cluster.getZkServer().getZkAddress())
+        .withFunctionName("search", CloudSolrStream.class)
+        .withFunctionName("null", NullStream.class)
+        .withFunctionName("parallel", ParallelStream.class);
+
+    try {
+
+      // Basic test
+      stream = factory.constructStream("parallel(" + COLLECTIONORALIAS + ", workers=2, sort=\"nullCount desc\", null(search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc\", partitionKeys=id), by=\"a_i asc\"))");
+      stream.setStreamContext(streamContext);
+      tuples = getTuples(stream);
+      assertTrue(tuples.size() == 2);
+      long nullCount = 0;
+      for (Tuple t : tuples) {
+        nullCount += t.getLong("nullCount");
+      }
+
+      assertEquals(nullCount, 6L);
+    } finally {
+      solrClientCache.close();
+    }
+  }
+
+  @Test
+  public void testMergeStream() throws Exception {
+
+    new UpdateRequest()
+        .add(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "0")
+        .add(id, "2", "a_s", "hello2", "a_i", "2", "a_f", "0")
+        .add(id, "3", "a_s", "hello3", "a_i", "3", "a_f", "3")
+        .add(id, "4", "a_s", "hello4", "a_i", "4", "a_f", "4")
+        .add(id, "1", "a_s", "hello1", "a_i", "1", "a_f", "1")
+        .commit(cluster.getSolrClient(), COLLECTIONORALIAS);
+
+    StreamExpression expression;
+    TupleStream stream;
+    List<Tuple> tuples;
+    
+    StreamFactory factory = new StreamFactory()
+      .withCollectionZkHost(COLLECTIONORALIAS, cluster.getZkServer().getZkAddress())
+      .withFunctionName("search", CloudSolrStream.class)
+      .withFunctionName("unique", UniqueStream.class)
+      .withFunctionName("merge", MergeStream.class);
+    
+    // Basic test
+    expression = StreamExpressionParser.parse("merge("
+        + "search(" + COLLECTIONORALIAS + ", q=\"id:(0 3 4)\", fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc\"),"
+        + "search(" + COLLECTIONORALIAS + ", q=\"id:(1)\", fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc\"),"
+        + "on=\"a_f asc\")");
+
+    stream = new MergeStream(expression, factory);
+    StreamContext streamContext = new StreamContext();
+    SolrClientCache solrClientCache = new SolrClientCache();
+    streamContext.setSolrClientCache(solrClientCache);
+    try {
+      stream.setStreamContext(streamContext);
+      tuples = getTuples(stream);
+
+      assert (tuples.size() == 4);
+      assertOrder(tuples, 0, 1, 3, 4);
+
+      // Basic test desc
+      expression = StreamExpressionParser.parse("merge("
+          + "search(" + COLLECTIONORALIAS + ", q=\"id:(0 3 4)\", fl=\"id,a_s,a_i,a_f\", sort=\"a_f desc\"),"
+          + "search(" + COLLECTIONORALIAS + ", q=\"id:(1)\", fl=\"id,a_s,a_i,a_f\", sort=\"a_f desc\"),"
+          + "on=\"a_f desc\")");
+      stream = new MergeStream(expression, factory);
+      stream.setStreamContext(streamContext);
+      tuples = getTuples(stream);
+
+      assert (tuples.size() == 4);
+      assertOrder(tuples, 4, 3, 1, 0);
+
+      // Basic w/multi comp
+      expression = StreamExpressionParser.parse("merge("
+          + "search(" + COLLECTIONORALIAS + ", q=\"id:(0 3 4)\", fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_s asc\"),"
+          + "search(" + COLLECTIONORALIAS + ", q=\"id:(1 2)\", fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_s asc\"),"
+          + "on=\"a_f asc, a_s asc\")");
+      stream = new MergeStream(expression, factory);
+      stream.setStreamContext(streamContext);
+      tuples = getTuples(stream);
+
+      assert (tuples.size() == 5);
+      assertOrder(tuples, 0, 2, 1, 3, 4);
+
+      // full factory w/multi comp
+      stream = factory.constructStream("merge("
+          + "search(" + COLLECTIONORALIAS + ", q=\"id:(0 3 4)\", fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_s asc\"),"
+          + "search(" + COLLECTIONORALIAS + ", q=\"id:(1 2)\", fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_s asc\"),"
+          + "on=\"a_f asc, a_s asc\")");
+      stream.setStreamContext(streamContext);
+      tuples = getTuples(stream);
+
+      assert (tuples.size() == 5);
+      assertOrder(tuples, 0, 2, 1, 3, 4);
+
+      // full factory w/multi streams
+      stream = factory.constructStream("merge("
+          + "search(" + COLLECTIONORALIAS + ", q=\"id:(0 4)\", fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_s asc\"),"
+          + "search(" + COLLECTIONORALIAS + ", q=\"id:(1)\", fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_s asc\"),"
+          + "search(" + COLLECTIONORALIAS + ", q=\"id:(2)\", fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_s asc\"),"
+          + "on=\"a_f asc\")");
+      stream.setStreamContext(streamContext);
+      tuples = getTuples(stream);
+
+      assert (tuples.size() == 4);
+      assertOrder(tuples, 0, 2, 1, 4);
+    } finally {
+      solrClientCache.close();
+    }
+  }
+
+  @Test
+  public void testRankStream() throws Exception {
+
+    new UpdateRequest()
+        .add(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "0")
+        .add(id, "2", "a_s", "hello2", "a_i", "2", "a_f", "0")
+        .add(id, "3", "a_s", "hello3", "a_i", "3", "a_f", "3")
+        .add(id, "4", "a_s", "hello4", "a_i", "4", "a_f", "4")
+        .add(id, "1", "a_s", "hello1", "a_i", "1", "a_f", "1")
+        .commit(cluster.getSolrClient(), COLLECTIONORALIAS);
+
+    StreamExpression expression;
+    TupleStream stream;
+    List<Tuple> tuples;
+    StreamContext streamContext = new StreamContext();
+    SolrClientCache solrClientCache = new SolrClientCache();
+    streamContext.setSolrClientCache(solrClientCache);
+
+    StreamFactory factory = new StreamFactory()
+      .withCollectionZkHost(COLLECTIONORALIAS, cluster.getZkServer().getZkAddress())
+      .withFunctionName("search", CloudSolrStream.class)
+      .withFunctionName("unique", UniqueStream.class)
+      .withFunctionName("top", RankStream.class);
+    try {
+      // Basic test
+      expression = StreamExpressionParser.parse("top("
+          + "n=3,"
+          + "search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_i asc\"),"
+          + "sort=\"a_f asc, a_i asc\")");
+      stream = new RankStream(expression, factory);
+      stream.setStreamContext(streamContext);
+      tuples = getTuples(stream);
+
+      assert (tuples.size() == 3);
+      assertOrder(tuples, 0, 2, 1);
+
+      // Basic test desc
+      expression = StreamExpressionParser.parse("top("
+          + "n=2,"
+          + "unique("
+          + "search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f desc\"),"
+          + "over=\"a_f\"),"
+          + "sort=\"a_f desc\")");
+      stream = new RankStream(expression, factory);
+      stream.setStreamContext(streamContext);
+      tuples = getTuples(stream);
+
+      assert (tuples.size() == 2);
+      assertOrder(tuples, 4, 3);
+
+      // full factory
+      stream = factory.constructStream("top("
+          + "n=4,"
+          + "unique("
+          + "search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_i asc\"),"
+          + "over=\"a_f\"),"
+          + "sort=\"a_f asc\")");
+      stream.setStreamContext(streamContext);
+      tuples = getTuples(stream);
+
+      assert (tuples.size() == 4);
+      assertOrder(tuples, 0, 1, 3, 4);
+
+      // full factory, switch order
+      stream = factory.constructStream("top("
+          + "n=4,"
+          + "unique("
+          + "search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f desc, a_i desc\"),"
+          + "over=\"a_f\"),"
+          + "sort=\"a_f asc\")");
+      stream.setStreamContext(streamContext);
+      tuples = getTuples(stream);
+
+      assert (tuples.size() == 4);
+      assertOrder(tuples, 2, 1, 3, 4);
+    } finally {
+      solrClientCache.close();
+    }
+  }
+
+  @Test
+  public void testReducerStream() throws Exception {
+
+    new UpdateRequest()
+        .add(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "1")
+        .add(id, "2", "a_s", "hello0", "a_i", "2", "a_f", "2")
+        .add(id, "3", "a_s", "hello3", "a_i", "3", "a_f", "3")
+        .add(id, "4", "a_s", "hello4", "a_i", "4", "a_f", "4")
+        .add(id, "1", "a_s", "hello0", "a_i", "1", "a_f", "5")
+        .add(id, "5", "a_s", "hello3", "a_i", "10", "a_f", "6")
+        .add(id, "6", "a_s", "hello4", "a_i", "11", "a_f", "7")
+        .add(id, "7", "a_s", "hello3", "a_i", "12", "a_f", "8")
+        .add(id, "8", "a_s", "hello3", "a_i", "13", "a_f", "9")
+        .add(id, "9", "a_s", "hello0", "a_i", "14", "a_f", "10")
+        .commit(cluster.getSolrClient(), COLLECTIONORALIAS);
+    
+    StreamExpression expression;
+    TupleStream stream;
+    List<Tuple> tuples;
+    Tuple t0, t1, t2;
+    List<Map> maps0, maps1, maps2;
+    StreamContext streamContext = new StreamContext();
+    SolrClientCache solrClientCache = new SolrClientCache();
+    streamContext.setSolrClientCache(solrClientCache);
+
+    StreamFactory factory = new StreamFactory()
+        .withCollectionZkHost(COLLECTIONORALIAS, cluster.getZkServer().getZkAddress())
+        .withFunctionName("search", CloudSolrStream.class)
+        .withFunctionName("reduce", ReducerStream.class)
+        .withFunctionName("group", GroupOperation.class);
+
+    try {
+      // basic
+      expression = StreamExpressionParser.parse("reduce("
+          + "search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_s asc, a_f asc\"),"
+          + "by=\"a_s\","
+          + "group(sort=\"a_f desc\", n=\"4\"))");
+
+      stream = factory.constructStream(expression);
+      stream.setStreamContext(streamContext);
+      tuples = getTuples(stream);
+
+      assert (tuples.size() == 3);
+
+      t0 = tuples.get(0);
+      maps0 = t0.getMaps("group");
+      assertMaps(maps0, 9, 1, 2, 0);
+
+      t1 = tuples.get(1);
+      maps1 = t1.getMaps("group");
+      assertMaps(maps1, 8, 7, 5, 3);
+
+
+      t2 = tuples.get(2);
+      maps2 = t2.getMaps("group");
+      assertMaps(maps2, 6, 4);
+
+      // basic w/spaces
+      expression = StreamExpressionParser.parse("reduce("
+          + "search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_s asc, a_f       asc\"),"
+          + "by=\"a_s\"," +
+          "group(sort=\"a_i asc\", n=\"2\"))");
+      stream = factory.constructStream(expression);
+      stream.setStreamContext(streamContext);
+      tuples = getTuples(stream);
+
+      assert (tuples.size() == 3);
+
+      t0 = tuples.get(0);
+      maps0 = t0.getMaps("group");
+      assert (maps0.size() == 2);
+
+      assertMaps(maps0, 0, 1);
+
+      t1 = tuples.get(1);
+      maps1 = t1.getMaps("group");
+      assertMaps(maps1, 3, 5);
+
+      t2 = tuples.get(2);
+      maps2 = t2.getMaps("group");
+      assertMaps(maps2, 4, 6);
+    } finally {
+      solrClientCache.close();
+    }
+  }
+
+  @Test
+  public void testHavingStream() throws Exception {
+
+    SolrClientCache solrClientCache = new SolrClientCache();
+
+    new UpdateRequest()
+        .add(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "1", "subject", "blah blah blah 0")
+        .add(id, "2", "a_s", "hello0", "a_i", "2", "a_f", "2", "subject", "blah blah blah 2")
+        .add(id, "3", "a_s", "hello3", "a_i", "3", "a_f", "3", "subject", "blah blah blah 3")
+        .add(id, "4", "a_s", "hello4", "a_i", "4", "a_f", "4", "subject", "blah blah blah 4")
+        .add(id, "1", "a_s", "hello0", "a_i", "1", "a_f", "5", "subject", "blah blah blah 1")
+        .add(id, "5", "a_s", "hello3", "a_i", "5", "a_f", "6", "subject", "blah blah blah 5")
+        .add(id, "6", "a_s", "hello4", "a_i", "6", "a_f", "7", "subject", "blah blah blah 6")
+        .add(id, "7", "a_s", "hello3", "a_i", "7", "a_f", "8", "subject", "blah blah blah 7")
+        .add(id, "8", "a_s", "hello3", "a_i", "8", "a_f", "9", "subject", "blah blah blah 8")
+        .add(id, "9", "a_s", "hello0", "a_i", "9", "a_f", "10", "subject", "blah blah blah 9")
+        .commit(cluster.getSolrClient(), COLLECTIONORALIAS);
+
+    TupleStream stream;
+    List<Tuple> tuples;
+
+    StreamFactory factory = new StreamFactory()
+        .withCollectionZkHost(COLLECTIONORALIAS, cluster.getZkServer().getZkAddress())
+        .withFunctionName("search", CloudSolrStream.class)
+        .withFunctionName("having", HavingStream.class)
+        .withFunctionName("rollup", RollupStream.class)
+        .withFunctionName("sum", SumMetric.class)
+        .withFunctionName("and", AndEvaluator.class)
+        .withFunctionName("or", OrEvaluator.class)
+        .withFunctionName("not", NotEvaluator.class)
+        .withFunctionName("gt", GreaterThanEvaluator.class)
+        .withFunctionName("lt", LessThanEvaluator.class)
+        .withFunctionName("eq", EqualToEvaluator.class)
+        .withFunctionName("lteq", LessThanEqualToEvaluator.class)
+        .withFunctionName("gteq", GreaterThanEqualToEvaluator.class);
+
+    stream = factory.constructStream("having(search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc\"), eq(a_i, 9))");
+    StreamContext context = new StreamContext();
+    context.setSolrClientCache(solrClientCache);
+    stream.setStreamContext(context);
+    tuples = getTuples(stream);
+
+    assert(tuples.size() == 1);
+    Tuple t = tuples.get(0);
+    assertTrue(t.getString("id").equals("9"));
+
+    stream = factory.constructStream("having(search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc\"), and(eq(a_i, 9),lt(a_i, 10)))");
+    context = new StreamContext();
+    context.setSolrClientCache(solrClientCache);
+    stream.setStreamContext(context);
+    tuples = getTuples(stream);
+
+    assert(tuples.size() == 1);
+    t = tuples.get(0);
+    assertTrue(t.getString("id").equals("9"));
+
+    stream = factory.constructStream("having(search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc\"), or(eq(a_i, 9),eq(a_i, 8)))");
+    context = new StreamContext();
+    context.setSolrClientCache(solrClientCache);
+    stream.setStreamContext(context);
+    tuples = getTuples(stream);
+
+    assert(tuples.size() == 2);
+    t = tuples.get(0);
+    assertTrue(t.getString("id").equals("8"));
+
+    t = tuples.get(1);
+    assertTrue(t.getString("id").equals("9"));
+
+
+    stream = factory.constructStream("having(search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc\"), and(eq(a_i, 9),not(eq(a_i, 9))))");
+    context = new StreamContext();
+    context.setSolrClientCache(solrClientCache);
+    stream.setStreamContext(context);
+    tuples = getTuples(stream);
+
+    assert(tuples.size() == 0);
+
+    stream = factory.constructStream("having(search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc\"), and(lteq(a_i, 9), gteq(a_i, 8)))");
+    context = new StreamContext();
+    context.setSolrClientCache(solrClientCache);
+    stream.setStreamContext(context);
+    tuples = getTuples(stream);
+
+    assert(tuples.size() == 2);
+
+    t = tuples.get(0);
+    assertTrue(t.getString("id").equals("8"));
+
+    t = tuples.get(1);
+    assertTrue(t.getString("id").equals("9"));
+
+    stream = factory.constructStream("having(rollup(over=a_f, sum(a_i), search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc\")), and(eq(sum(a_i), 9),eq(sum(a_i), 9)))");
+    context = new StreamContext();
+    context.setSolrClientCache(solrClientCache);
+    stream.setStreamContext(context);
+    tuples = getTuples(stream);
+
+    assert(tuples.size() == 1);
+    t = tuples.get(0);
+    assertTrue(t.getDouble("a_f") == 10.0D);
+
+    solrClientCache.close();
+  }
+
+  @Test
+  public void testParallelHavingStream() throws Exception {
+
+    SolrClientCache solrClientCache = new SolrClientCache();
+
+    new UpdateRequest()
+        .add(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "1", "subject", "blah blah blah 0")
+        .add(id, "2", "a_s", "hello0", "a_i", "2", "a_f", "2", "subject", "blah blah blah 2")
+        .add(id, "3", "a_s", "hello3", "a_i", "3", "a_f", "3", "subject", "blah blah blah 3")
+        .add(id, "4", "a_s", "hello4", "a_i", "4", "a_f", "4", "subject", "blah blah blah 4")
+        .add(id, "1", "a_s", "hello0", "a_i", "1", "a_f", "5", "subject", "blah blah blah 1")
+        .add(id, "5", "a_s", "hello3", "a_i", "5", "a_f", "6", "subject", "blah blah blah 5")
+        .add(id, "6", "a_s", "hello4", "a_i", "6", "a_f", "7", "subject", "blah blah blah 6")
+        .add(id, "7", "a_s", "hello3", "a_i", "7", "a_f", "8", "subject", "blah blah blah 7")
+        .add(id, "8", "a_s", "hello3", "a_i", "8", "a_f", "9", "subject", "blah blah blah 8")
+        .add(id, "9", "a_s", "hello0", "a_i", "9", "a_f", "10", "subject", "blah blah blah 9")
+        .commit(cluster.getSolrClient(), COLLECTIONORALIAS);
+
+    TupleStream stream;
+    List<Tuple> tuples;
+
+    StreamFactory factory = new StreamFactory()
+        .withCollectionZkHost(COLLECTIONORALIAS, cluster.getZkServer().getZkAddress())
+        .withFunctionName("search", CloudSolrStream.class)
+        .withFunctionName("having", HavingStream.class)
+        .withFunctionName("rollup", RollupStream.class)
+        .withFunctionName("sum", SumMetric.class)
+        .withFunctionName("and", AndEvaluator.class)
+        .withFunctionName("or", OrEvaluator.class)
+        .withFunctionName("not", NotEvaluator.class)
+        .withFunctionName("gt", GreaterThanEvaluator.class)
+        .withFunctionName("lt", LessThanEvaluator.class)
+        .withFunctionName("eq", EqualToEvaluator.class)
+        .withFunctionName("lteq", LessThanEqualToEvaluator.class)
+        .withFunctionName("gteq", GreaterThanEqualToEvaluator.class)
+        .withFunctionName("val", RawValueEvaluator.class)
+        .withFunctionName("parallel", ParallelStream.class);
+
+    stream = factory.constructStream("parallel(" + COLLECTIONORALIAS + ", workers=2, sort=\"a_f asc\", having(search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc\", partitionKeys=id), eq(a_i, 9)))");
+    StreamContext context = new StreamContext();
+    context.setSolrClientCache(solrClientCache);
+    stream.setStreamContext(context);
+    tuples = getTuples(stream);
+
+    assert(tuples.size() == 1);
+    Tuple t = tuples.get(0);
+    assertTrue(t.getString("id").equals("9"));
+
+    stream = factory.constructStream("parallel(" + COLLECTIONORALIAS + ", workers=2, sort=\"a_f asc\", having(search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc\", partitionKeys=id), and(eq(a_i, 9),lt(a_i, 10))))");
+    context = new StreamContext();
+    context.setSolrClientCache(solrClientCache);
+    stream.setStreamContext(context);
+    tuples = getTuples(stream);
+
+    assert(tuples.size() == 1);
+    t = tuples.get(0);
+    assertTrue(t.getString("id").equals("9"));
+
+    stream = factory.constructStream("parallel(" + COLLECTIONORALIAS + ", workers=2, sort=\"a_f asc\",having(search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc\", partitionKeys=id), or(eq(a_i, 9),eq(a_i, 8))))");
+    context = new StreamContext();
+    context.setSolrClientCache(solrClientCache);
+    stream.setStreamContext(context);
+    tuples = getTuples(stream);
+
+    assert(tuples.size() == 2);
+    t = tuples.get(0);
+    assertTrue(t.getString("id").equals("8"));
+
+    t = tuples.get(1);
+    assertTrue(t.getString("id").equals("9"));
+
+
+    stream = factory.constructStream("parallel(" + COLLECTIONORALIAS + ", workers=2, sort=\"a_f asc\", having(search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc\", partitionKeys=id), and(eq(a_i, 9),not(eq(a_i, 9)))))");
+    context = new StreamContext();
+    context.setSolrClientCache(solrClientCache);
+    stream.setStreamContext(context);
+    tuples = getTuples(stream);
+
+    assert(tuples.size() == 0);
+
+
+    stream = factory.constructStream("parallel(" + COLLECTIONORALIAS + ", workers=2, sort=\"a_f asc\",having(search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc\", partitionKeys=id), and(lteq(a_i, 9), gteq(a_i, 8))))");
+    context = new StreamContext();
+    context.setSolrClientCache(solrClientCache);
+    stream.setStreamContext(context);
+    tuples = getTuples(stream);
+
+    assert(tuples.size() == 2);
+
+    t = tuples.get(0);
+    assertTrue(t.getString("id").equals("8"));
+
+    t = tuples.get(1);
+    assertTrue(t.getString("id").equals("9"));
+
+    stream = factory.constructStream("parallel("+COLLECTIONORALIAS+", workers=2, sort=\"a_f asc\", having(rollup(over=a_f, sum(a_i), search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc\", partitionKeys=a_f)), and(eq(sum(a_i), 9),eq(sum(a_i),9))))");
+    context = new StreamContext();
+    context.setSolrClientCache(solrClientCache);
+    stream.setStreamContext(context);
+    tuples = getTuples(stream);
+
+    assert(tuples.size() == 1);
+
+    t = tuples.get(0);
+    assertTrue(t.getDouble("a_f") == 10.0D);
+
+    solrClientCache.close();
+  }
+
+  @Test
+  public void testFetchStream() throws Exception {
+
+    SolrClientCache solrClientCache = new SolrClientCache();//TODO share in @Before ; close in @After ?
+
+    new UpdateRequest()
+        .add(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "1", "subject", "blah blah blah 0")
+        .add(id, "2", "a_s", "hello0", "a_i", "2", "a_f", "2", "subject", "blah blah blah 2")
+        .add(id, "3", "a_s", "hello3", "a_i", "3", "a_f", "3", "subject", "blah blah blah 3")
+        .add(id, "4", "a_s", "hello4", "a_i", "4", "a_f", "4", "subject", "blah blah blah 4")
+        .add(id, "1", "a_s", "hello0", "a_i", "1", "a_f", "5", "subject", "blah blah blah 1")
+        .add(id, "5", "a_s", "hello3", "a_i", "5", "a_f", "6", "subject", "blah blah blah 5")
+        .add(id, "6", "a_s", "hello4", "a_i", "6", "a_f", "7", "subject", "blah blah blah 6")
+        .add(id, "7", "a_s", "hello3", "a_i", "7", "a_f", "8", "subject", "blah blah blah 7")
+        .add(id, "8", "a_s", "hello3", "a_i", "8", "a_f", "9", "subject", "blah blah blah 8")
+        .add(id, "9", "a_s", "hello0", "a_i", "9", "a_f", "10", "subject", "blah blah blah 9")
+        .commit(cluster.getSolrClient(), COLLECTIONORALIAS);
+
+    TupleStream stream;
+    List<Tuple> tuples;
+
+    StreamFactory factory = new StreamFactory()
+        .withCollectionZkHost(COLLECTIONORALIAS, cluster.getZkServer().getZkAddress())
+        .withFunctionName("search", CloudSolrStream.class)
+        .withFunctionName("fetch", FetchStream.class);
+
+    stream = factory.constructStream("fetch("+ COLLECTIONORALIAS +",  search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc\"), on=\"id=a_i\", batchSize=\"2\", fl=\"subject\")");
+    StreamContext context = new StreamContext();
+    context.setSolrClientCache(solrClientCache);
+    stream.setStreamContext(context);
+    tuples = getTuples(stream);
+
+    assert(tuples.size() == 10);
+    Tuple t = tuples.get(0);
+    assertTrue("blah blah blah 0".equals(t.getString("subject")));
+    t = tuples.get(1);
+    assertTrue("blah blah blah 2".equals(t.getString("subject")));
+    t = tuples.get(2);
+    assertTrue("blah blah blah 3".equals(t.getString("subject")));
+    t = tuples.get(3);
+    assertTrue("blah blah blah 4".equals(t.getString("subject")));
+    t = tuples.get(4);
+    assertTrue("blah blah blah 1".equals(t.getString("subject")));
+    t = tuples.get(5);
+    assertTrue("blah blah blah 5".equals(t.getString("subject")));
+    t = tuples.get(6);
+    assertTrue("blah blah blah 6".equals(t.getString("subject")));
+    t = tuples.get(7);
+    assertTrue("blah blah blah 7".equals(t.getString("subject")));
+    t = tuples.get(8);
+    assertTrue("blah blah blah 8".equals(t.getString("subject")));
+    t = tuples.get(9);
+    assertTrue("blah blah blah 9".equals(t.getString("subject")));
+
+    //Change the batch size
+    stream = factory.constructStream("fetch(" + COLLECTIONORALIAS + ",  search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc\"), on=\"id=a_i\", batchSize=\"3\", fl=\"subject\")");
+    context = new StreamContext();
+    context.setSolrClientCache(solrClientCache);
+    stream.setStreamContext(context);
+    tuples = getTuples(stream);
+
+    assert(tuples.size() == 10);
+    t = tuples.get(0);
+    assertTrue("blah blah blah 0".equals(t.getString("subject")));
+    t = tuples.get(1);
+    assertTrue("blah blah blah 2".equals(t.getString("subject")));
+    t = tuples.get(2);
+    assertTrue("blah blah blah 3".equals(t.getString("subject")));
+    t = tuples.get(3);
+    assertTrue("blah blah blah 4".equals(t.getString("subject")));
+    t = tuples.get(4);
+    assertTrue("blah blah blah 1".equals(t.getString("subject")));
+    t = tuples.get(5);
+    assertTrue("blah blah blah 5".equals(t.getString("subject")));
+    t = tuples.get(6);
+    assertTrue("blah blah blah 6".equals(t.getString("subject")));
+    t = tuples.get(7);
+    assertTrue("blah blah blah 7".equals(t.getString("subject")));
+    t = tuples.get(8);
+    assertTrue("blah blah blah 8".equals(t.getString("subject")));
+    t = tuples.get(9);
+    assertTrue("blah blah blah 9".equals(t.getString("subject")));
+
+    // SOLR-10404 test that "hello 99" as a value gets escaped
+    new UpdateRequest()
+        .add(id, "99", "a1_s", "hello 99", "a2_s", "hello 99", "subject", "blah blah blah 99")
+        .commit(cluster.getSolrClient(), COLLECTIONORALIAS);
+
+    stream = factory.constructStream("fetch("+ COLLECTIONORALIAS +",  search(" + COLLECTIONORALIAS + ", q=" + id + ":99, fl=\"id,a1_s\", sort=\"id asc\"), on=\"a1_s=a2_s\", fl=\"subject\")");
+    context = new StreamContext();
+    context.setSolrClientCache(solrClientCache);
+    stream.setStreamContext(context);
+    tuples = getTuples(stream);
+
+    assertEquals(1, tuples.size());
+    t = tuples.get(0);
+    assertTrue("blah blah blah 99".equals(t.getString("subject")));
+
+    solrClientCache.close();
+  }
+
+  @Test
+  public void testParallelFetchStream() throws Exception {
+
+    new UpdateRequest()
+        .add(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "1", "subject", "blah blah blah 0")
+        .add(id, "2", "a_s", "hello0", "a_i", "2", "a_f", "2", "subject", "blah blah blah 2")
+        .add(id, "3", "a_s", "hello3", "a_i", "3", "a_f", "3", "subject", "blah blah blah 3")
+        .add(id, "4", "a_s", "hello4", "a_i", "4", "a_f", "4", "subject", "blah blah blah 4")
+        .add(id, "1", "a_s", "hello0", "a_i", "1", "a_f", "5", "subject", "blah blah blah 1")
+        .add(id, "5", "a_s", "hello3", "a_i", "5", "a_f", "6", "subject", "blah blah blah 5")
+        .add(id, "6", "a_s", "hello4", "a_i", "6", "a_f", "7", "subject", "blah blah blah 6")
+        .add(id, "7", "a_s", "hello3", "a_i", "7", "a_f", "8", "subject", "blah blah blah 7")
+        .add(id, "8", "a_s", "hello3", "a_i", "8", "a_f", "9", "subject", "blah blah blah 8")
+        .add(id, "9", "a_s", "hello0", "a_i", "9", "a_f", "10", "subject", "blah blah blah 9")
+        .commit(cluster.getSolrClient(), COLLECTIONORALIAS);
+
+    StreamContext streamContext = new StreamContext();
+    SolrClientCache solrClientCache = new SolrClientCache();
+    streamContext.setSolrClientCache(solrClientCache);
+
+    TupleStream stream;
+    List<Tuple> tuples;
+
+    StreamFactory factory = new StreamFactory()
+        .withCollectionZkHost(COLLECTIONORALIAS, cluster.getZkServer().getZkAddress())
+        .withFunctionName("search", CloudSolrStream.class)
+        .withFunctionName("parallel", ParallelStream.class)
+        .withFunctionName("fetch", FetchStream.class);
+
+    try {
+
+      stream = factory.constructStream("parallel(" + COLLECTIONORALIAS + ", workers=2, sort=\"a_f asc\", fetch(" + COLLECTIONORALIAS + ",  search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc\", partitionKeys=\"id\"), on=\"id=a_i\", batchSize=\"2\", fl=\"subject\"))");
+      stream.setStreamContext(streamContext);
+      tuples = getTuples(stream);
+
+      assert (tuples.size() == 10);
+      Tuple t = tuples.get(0);
+      assertTrue("blah blah blah 0".equals(t.getString("subject")));
+      t = tuples.get(1);
+      assertTrue("blah blah blah 2".equals(t.getString("subject")));
+      t = tuples.get(2);
+      assertTrue("blah blah blah 3".equals(t.getString("subject")));
+      t = tuples.get(3);
+      assertTrue("blah blah blah 4".equals(t.getString("subject")));
+      t = tuples.get(4);
+      assertTrue("blah blah blah 1".equals(t.getString("subject")));
+      t = tuples.get(5);
+      assertTrue("blah blah blah 5".equals(t.getString("subject")));
+      t = tuples.get(6);
+      assertTrue("blah blah blah 6".equals(t.getString("subject")));
+      t = tuples.get(7);
+      assertTrue("blah blah blah 7".equals(t.getString("subject")));
+      t = tuples.get(8);
+      assertTrue("blah blah blah 8".equals(t.getString("subject")));
+      t = tuples.get(9);
+      assertTrue("blah blah blah 9".equals(t.getString("subject")));
+
+
+      stream = factory.constructStream("parallel(" + COLLECTIONORALIAS + ", workers=2, sort=\"a_f asc\", fetch(" + COLLECTIONORALIAS + ",  search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc\", partitionKeys=\"id\"), on=\"id=a_i\", batchSize=\"3\", fl=\"subject\"))");
+      stream.setStreamContext(streamContext);
+      tuples = getTuples(stream);
+
+      assert (tuples.size() == 10);
+      t = tuples.get(0);
+      assertTrue("blah blah blah 0".equals(t.getString("subject")));
+      t = tuples.get(1);
+      assertTrue("blah blah blah 2".equals(t.getString("subject")));
+      t = tuples.get(2);
+      assertTrue("blah blah blah 3".equals(t.getString("subject")));
+      t = tuples.get(3);
+      assertTrue("blah blah blah 4".equals(t.getString("subject")));
+      t = tuples.get(4);
+      assertTrue("blah blah blah 1".equals(t.getString("subject")));
+      t = tuples.get(5);
+      assertTrue("blah blah blah 5".equals(t.getString("subject")));
+      t = tuples.get(6);
+      assertTrue("blah blah blah 6".equals(t.getString("subject")));
+      t = tuples.get(7);
+      assertTrue("blah blah blah 7".equals(t.getString("subject")));
+      t = tuples.get(8);
+      assertTrue("blah blah blah 8".equals(t.getString("subject")));
+      t = tuples.get(9);
+      assertTrue("blah blah blah 9".equals(t.getString("subject")));
+    } finally {
+      solrClientCache.close();
+    }
+  }
+
+  @Test
+  public void testDaemonStream() throws Exception {
+
+    new UpdateRequest()
+        .add(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "1")
+        .add(id, "2", "a_s", "hello0", "a_i", "2", "a_f", "2")
+        .add(id, "3", "a_s", "hello3", "a_i", "3", "a_f", "3")
+        .add(id, "4", "a_s", "hello4", "a_i", "4", "a_f", "4")
+        .add(id, "1", "a_s", "hello0", "a_i", "1", "a_f", "5")
+        .add(id, "5", "a_s", "hello3", "a_i", "10", "a_f", "6")
+        .add(id, "6", "a_s", "hello4", "a_i", "11", "a_f", "7")
+        .add(id, "7", "a_s", "hello3", "a_i", "12", "a_f", "8")
+        .add(id, "8", "a_s", "hello3", "a_i", "13", "a_f", "9")
+        .add(id, "9", "a_s", "hello0", "a_i", "14", "a_f", "10")
+        .commit(cluster.getSolrClient(), COLLECTIONORALIAS);
+
+    StreamFactory factory = new StreamFactory()
+        .withCollectionZkHost(COLLECTIONORALIAS, cluster.getZkServer().getZkAddress())
+        .withFunctionName("search", CloudSolrStream.class)
+        .withFunctionName("rollup", RollupStream.class)
+        .withFunctionName("sum", SumMetric.class)
+        .withFunctionName("min", MinMetric.class)
+        .withFunctionName("max", MaxMetric.class)
+        .withFunctionName("avg", MeanMetric.class)
+        .withFunctionName("count", CountMetric.class)
+        .withFunctionName("daemon", DaemonStream.class);
+
+    StreamExpression expression;
+    DaemonStream daemonStream;
+
+    expression = StreamExpressionParser.parse("daemon(rollup("
+        + "search(" + COLLECTIONORALIAS + ", q=\"*:*\", fl=\"a_i,a_s\", sort=\"a_s asc\"),"
+        + "over=\"a_s\","
+        + "sum(a_i)"
+        + "), id=\"test\", runInterval=\"1000\", queueSize=\"9\")");
+    daemonStream = (DaemonStream)factory.constructStream(expression);
+    StreamContext streamContext = new StreamContext();
+    SolrClientCache solrClientCache = new SolrClientCache();
+    streamContext.setSolrClientCache(solrClientCache);
+    daemonStream.setStreamContext(streamContext);
+    try {
+      //Test Long and Double Sums
+
+      daemonStream.open(); // This will start the daemon thread
+
+      for (int i = 0; i < 4; i++) {
+        Tuple tuple = daemonStream.read(); // Reads from the queue
+        String bucket = tuple.getString("a_s");
+        Double sumi = tuple.getDouble("sum(a_i)");
+
+        //System.out.println("#################################### Bucket 1:"+bucket);
+        assertTrue(bucket.equals("hello0"));
+        assertTrue(sumi.doubleValue() == 17.0D);
+
+        tuple = daemonStream.read();
+        bucket = tuple.getString("a_s");
+        sumi = tuple.getDouble("sum(a_i)");
+
+        //System.out.println("#################################### Bucket 2:"+bucket);
+        assertTrue(bucket.equals("hello3"));
+        assertTrue(sumi.doubleValue() == 38.0D);
+
+        tuple = daemonStream.read();
+        bucket = tuple.getString("a_s");
+        sumi = tuple.getDouble("sum(a_i)");
+        //System.out.println("#################################### Bucket 3:"+bucket);
+        assertTrue(bucket.equals("hello4"));
+        assertTrue(sumi.longValue() == 15);
+      }
+
+      //Now lets wait until the internal queue fills up
+
+      while (daemonStream.remainingCapacity() > 0) {
+        try {
+          Thread.sleep(1000);
+        } catch (Exception e) {
+
+        }
+      }
+
+      //OK capacity is full, let's index a new doc
+
+      new UpdateRequest()
+          .add(id, "10", "a_s", "hello0", "a_i", "1", "a_f", "10")
+          .commit(cluster.getSolrClient(), COLLECTIONORALIAS);
+
+      //Now lets clear the existing docs in the queue 9, plus 3 more to get passed the run that was blocked. The next run should
+      //have the tuples with the updated count.
+      for (int i = 0; i < 12; i++) {
+        daemonStream.read();
+      }
+
+      //And rerun the loop. It should have a new count for hello0
+      for (int i = 0; i < 4; i++) {
+        Tuple tuple = daemonStream.read(); // Reads from the queue
+        String bucket = tuple.getString("a_s");
+        Double sumi = tuple.getDouble("sum(a_i)");
+
+        assertTrue(bucket.equals("hello0"));
+        assertTrue(sumi.doubleValue() == 18.0D);
+
+        tuple = daemonStream.read();
+        bucket = tuple.getString("a_s");
+        sumi = tuple.getDouble("sum(a_i)");
+
+        assertTrue(bucket.equals("hello3"));
+        assertTrue(sumi.doubleValue() == 38.0D);
+
+        tuple = daemonStream.read();
+        bucket = tuple.getString("a_s");
+        sumi = tuple.getDouble("sum(a_i)");
+        assertTrue(bucket.equals("hello4"));
+        assertTrue(sumi.longValue() == 15);
+      }
+    } finally {
+      daemonStream.close(); //This should stop the daemon thread
+      solrClientCache.close();
+    }
+  }
+
+  @Test
+  public void testTerminatingDaemonStream() throws Exception {
+    Assume.assumeTrue(!useAlias);
+
+    new UpdateRequest()
+        .add(id, "0", "a_s", "hello", "a_i", "0", "a_f", "1")
+        .add(id, "2", "a_s", "hello", "a_i", "2", "a_f", "2")
+        .add(id, "3", "a_s", "hello", "a_i", "3", "a_f", "3")
+        .add(id, "4", "a_s", "hello", "a_i", "4", "a_f", "4")
+        .add(id, "1", "a_s", "hello", "a_i", "1", "a_f", "5")
+        .add(id, "5", "a_s", "hello", "a_i", "10", "a_f", "6")
+        .add(id, "6", "a_s", "hello", "a_i", "11", "a_f", "7")
+        .add(id, "7", "a_s", "hello", "a_i", "12", "a_f", "8")
+        .add(id, "8", "a_s", "hello", "a_i", "13", "a_f", "9")
+        .add(id, "9", "a_s", "hello", "a_i", "14", "a_f", "10")
+        .commit(cluster.getSolrClient(), COLLECTIONORALIAS);
+
+    StreamFactory factory = new StreamFactory()
+        .withCollectionZkHost(COLLECTIONORALIAS, cluster.getZkServer().getZkAddress())
+        .withFunctionName("topic", TopicStream.class)
+        .withFunctionName("daemon", DaemonStream.class);
+
+    StreamExpression expression;
+    DaemonStream daemonStream;
+
+    SolrClientCache cache = new SolrClientCache();
+    StreamContext context = new StreamContext();
+    context.setSolrClientCache(cache);
+    expression = StreamExpressionParser.parse("daemon(topic("+ COLLECTIONORALIAS +","+ COLLECTIONORALIAS +", q=\"a_s:hello\", initialCheckpoint=0, id=\"topic1\", rows=2, fl=\"id\""
+        + "), id=test, runInterval=1000, terminate=true, queueSize=50)");
+    daemonStream = (DaemonStream)factory.constructStream(expression);
+    daemonStream.setStreamContext(context);
+
+    List<Tuple> tuples = getTuples(daemonStream);
+    assertTrue(tuples.size() == 10);
+    cache.close();
+  }
+
+  @Test
+  public void testRollupStream() throws Exception {
+
+    new UpdateRequest()
+        .add(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "1")
+        .add(id, "2", "a_s", "hello0", "a_i", "2", "a_f", "2")
+        .add(id, "3", "a_s", "hello3", "a_i", "3", "a_f", "3")
+        .add(id, "4", "a_s", "hello4", "a_i", "4", "a_f", "4")
+        .add(id, "1", "a_s", "hello0", "a_i", "1", "a_f", "5")
+        .add(id, "5", "a_s", "hello3", "a_i", "10", "a_f", "6")
+        .add(id, "6", "a_s", "hello4", "a_i", "11", "a_f", "7")
+        .add(id, "7", "a_s", "hello3", "a_i", "12", "a_f", "8")
+        .add(id, "8", "a_s", "hello3", "a_i", "13", "a_f", "9")
+        .add(id, "9", "a_s", "hello0", "a_i", "14", "a_f", "10")
+        .commit(cluster.getSolrClient(), COLLECTIONORALIAS);
+
+    StreamFactory factory = new StreamFactory()
+      .withCollectionZkHost(COLLECTIONORALIAS, cluster.getZkServer().getZkAddress())
+      .withFunctionName("search", CloudSolrStream.class)
+      .withFunctionName("rollup", RollupStream.class)
+      .withFunctionName("sum", SumMetric.class)
+      .withFunctionName("min", MinMetric.class)
+      .withFunctionName("max", MaxMetric.class)
+      .withFunctionName("avg", MeanMetric.class)
+      .withFunctionName("count", CountMetric.class);     
+    
+    StreamExpression expression;
+    TupleStream stream;
+    List<Tuple> tuples;
+    StreamContext streamContext = new StreamContext();
+    SolrClientCache solrClientCache = new SolrClientCache();
+    streamContext.setSolrClientCache(solrClientCache);
+    try {
+      expression = StreamExpressionParser.parse("rollup("
+          + "search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"a_s,a_i,a_f\", sort=\"a_s asc\"),"
+          + "over=\"a_s\","
+          + "sum(a_i),"
+          + "sum(a_f),"
+          + "min(a_i),"
+          + "min(a_f),"
+          + "max(a_i),"
+          + "max(a_f),"
+          + "avg(a_i),"
+          + "avg(a_f),"
+          + "count(*),"
+          + ")");
+      stream = factory.constructStream(expression);
+      stream.setStreamContext(streamContext);
+      tuples = getTuples(stream);
+
+      assert (tuples.size() == 3);
+
+      //Test Long and Double Sums
+
+      Tuple tuple = tuples.get(0);
+      String bucket = tuple.getString("a_s");
+      Double sumi = tuple.getDouble("sum(a_i)");
+      Double sumf = tuple.getDouble("sum(a_f)");
+      Double mini = tuple.getDouble("min(a_i)");
+      Double minf = tuple.getDouble("min(a_f)");
+      Double maxi = tuple.getDouble("max(a_i)");
+      Double maxf = tuple.getDouble("max(a_f)");
+      Double avgi = tuple.getDouble("avg(a_i)");
+      Double avgf = tuple.getDouble("avg(a_f)");
+      Double count = tuple.getDouble("count(*)");
+
+      assertTrue(bucket.equals("hello0"));
+      assertTrue(sumi.doubleValue() == 17.0D);
+      assertTrue(sumf.doubleValue() == 18.0D);
+      assertTrue(mini.doubleValue() == 0.0D);
+      assertTrue(minf.doubleValue() == 1.0D);
+      assertTrue(maxi.doubleValue() == 14.0D);
+      assertTrue(maxf.doubleValue() == 10.0D);
+      assertTrue(avgi.doubleValue() == 4.25D);
+      assertTrue(avgf.doubleValue() == 4.5D);
+      assertTrue(count.doubleValue() == 4);
+
+      tuple = tuples.get(1);
+      bucket = tuple.getString("a_s");
+      sumi = tuple.getDouble("sum(a_i)");
+      sumf = tuple.getDouble("sum(a_f)");
+      mini = tuple.getDouble("min(a_i)");
+      minf = tuple.getDouble("min(a_f)");
+      maxi = tuple.getDouble("max(a_i)");
+      maxf = tuple.getDouble("max(a_f)");
+      avgi = tuple.getDouble("avg(a_i)");
+      avgf = tuple.getDouble("avg(a_f)");
+      count = tuple.getDouble("count(*)");
+
+      assertTrue(bucket.equals("hello3"));
+      assertTrue(sumi.doubleValue() == 38.0D);
+      assertTrue(sumf.doubleValue() == 26.0D);
+      assertTrue(mini.doubleValue() == 3.0D);
+      assertTrue(minf.doubleValue() == 3.0D);
+      assertTrue(maxi.doubleValue() == 13.0D);
+      assertTrue(maxf.doubleValue() == 9.0D);
+      assertTrue(avgi.doubleValue() == 9.5D);
+      assertTrue(avgf.doubleValue() == 6.5D);
+      assertTrue(count.doubleValue() == 4);
+
+      tuple = tuples.get(2);
+      bucket = tuple.getString("a_s");
+      sumi = tuple.getDouble("sum(a_i)");
+      sumf = tuple.getDouble("sum(a_f)");
+      mini = tuple.getDouble("min(a_i)");
+      minf = tuple.getDouble("min(a_f)");
+      maxi = tuple.getDouble("max(a_i)");
+      maxf = tuple.getDouble("max(a_f)");
+      avgi = tuple.getDouble("avg(a_i)");
+      avgf = tuple.getDouble("avg(a_f)");
+      count = tuple.getDouble("count(*)");
+
+      assertTrue(bucket.equals("hello4"));
+      assertTrue(sumi.longValue() == 15);
+      assertTrue(sumf.doubleValue() == 11.0D);
+      assertTrue(mini.doubleValue() == 4.0D);
+      assertTrue(minf.doubleValue() == 4.0D);
+      assertTrue(maxi.doubleValue() == 11.0D);
+      assertTrue(maxf.doubleValue() == 7.0D);
+      assertTrue(avgi.doubleValue() == 7.5D);
+      assertTrue(avgf.doubleValue() == 5.5D);
+      assertTrue(count.doubleValue() == 2);
+
+    } finally {
+      solrClientCache.close();
+    }
+  }
+
+  @Test
+  public void testParallelUniqueStream() throws Exception {
+
+    new UpdateRequest()
+        .add(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "0")
+        .add(id, "2", "a_s", "hello2", "a_i", "2", "a_f", "0")
+        .add(id, "3", "a_s", "hello3", "a_i", "3", "a_f", "3")
+        .add(id, "4", "a_s", "hello4", "a_i", "4", "a_f", "4")
+        .add(id, "1", "a_s", "hello1", "a_i", "1", "a_f", "1")
+        .add(id, "5", "a_s", "hello1", "a_i", "10", "a_f", "1")
+        .add(id, "6", "a_s", "hello1", "a_i", "11", "a_f", "5")
+        .add(id, "7", "a_s", "hello1", "a_i", "12", "a_f", "5")
+        .add(id, "8", "a_s", "hello1", "a_i", "13", "a_f", "4")
+        .commit(cluster.getSolrClient(), COLLECTIONORALIAS);
+
+    String zkHost = cluster.getZkServer().getZkAddress();
+    StreamFactory streamFactory = new StreamFactory().withCollectionZkHost(COLLECTIONORALIAS, zkHost)
+        .withFunctionName("search", CloudSolrStream.class)
+        .withFunctionName("unique", UniqueStream.class)
+        .withFunctionName("top", RankStream.class)
+        .withFunctionName("group", ReducerStream.class)
+        .withFunctionName("parallel", ParallelStream.class);
+    StreamContext streamContext = new StreamContext();
+    SolrClientCache solrClientCache = new SolrClientCache();
+    streamContext.setSolrClientCache(solrClientCache);
+
+
+
+    try {
+
+      ParallelStream pstream = (ParallelStream) streamFactory.constructStream("parallel(" + COLLECTIONORALIAS + ", unique(search(collection1, q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_i asc\", partitionKeys=\"a_f\"), over=\"a_f\"), workers=\"2\", zkHost=\"" + zkHost + "\", sort=\"a_f asc\")");
+      pstream.setStreamContext(streamContext);
+      List<Tuple> tuples = getTuples(pstream);
+      assert (tuples.size() == 5);
+      assertOrder(tuples, 0, 1, 3, 4, 6);
+
+      //Test the eofTuples
+
+      Map<String, Tuple> eofTuples = pstream.getEofTuples();
+      assert (eofTuples.size() == 2); //There should be an EOF tuple for each worker.
+    } finally {
+      solrClientCache.close();
+    }
+  }
+
+  @Test
+  public void testParallelShuffleStream() throws Exception {
+
+    new UpdateRequest()
+        .add(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "0")
+        .add(id, "2", "a_s", "hello2", "a_i", "2", "a_f", "0")
+        .add(id, "3", "a_s", "hello3", "a_i", "3", "a_f", "3")
+        .add(id, "4", "a_s", "hello4", "a_i", "4", "a_f", "4")
+        .add(id, "1", "a_s", "hello1", "a_i", "1", "a_f", "1")
+        .add(id, "5", "a_s", "hello1", "a_i", "10", "a_f", "1")
+        .add(id, "6", "a_s", "hello1", "a_i", "11", "a_f", "5")
+        .add(id, "7", "a_s", "hello1", "a_i", "12", "a_f", "5")
+        .add(id, "8", "a_s", "hello1", "a_i", "13", "a_f", "4")
+        .add(id, "9", "a_s", "hello1", "a_i", "13", "a_f", "4")
+        .add(id, "10", "a_s", "hello1", "a_i", "13", "a_f", "4")
+        .add(id, "11", "a_s", "hello1", "a_i", "13", "a_f", "4")
+        .add(id, "12", "a_s", "hello1", "a_i", "13", "a_f", "4")
+        .add(id, "13", "a_s", "hello1", "a_i", "13", "a_f", "4")
+        .add(id, "14", "a_s", "hello1", "a_i", "13", "a_f", "4")
+        .add(id, "15", "a_s", "hello1", "a_i", "13", "a_f", "4")
+        .add(id, "16", "a_s", "hello1", "a_i", "13", "a_f", "4")
+        .add(id, "17", "a_s", "hello1", "a_i", "13", "a_f", "4")
+        .add(id, "18", "a_s", "hello1", "a_i", "13", "a_f", "4")
+        .add(id, "19", "a_s", "hello1", "a_i", "13", "a_f", "4")
+        .add(id, "20", "a_s", "hello1", "a_i", "13", "a_f", "4")
+        .add(id, "21", "a_s", "hello1", "a_i", "13", "a_f", "4")
+        .add(id, "22", "a_s", "hello1", "a_i", "13", "a_f", "4")
+        .add(id, "23", "a_s", "hello1", "a_i", "13", "a_f", "4")
+        .add(id, "24", "a_s", "hello1", "a_i", "13", "a_f", "4")
+        .add(id, "25", "a_s", "hello1", "a_i", "13", "a_f", "4")
+        .add(id, "26", "a_s", "hello1", "a_i", "13", "a_f", "4")
+        .add(id, "27", "a_s", "hello1", "a_i", "13", "a_f", "4")
+        .add(id, "28", "a_s", "hello1", "a_i", "13", "a_f", "4")
+        .add(id, "29", "a_s", "hello1", "a_i", "13", "a_f", "4")
+        .add(id, "30", "a_s", "hello1", "a_i", "13", "a_f", "4")
+        .add(id, "31", "a_s", "hello1", "a_i", "13", "a_f", "4")
+        .add(id, "32", "a_s", "hello1", "a_i", "13", "a_f", "4")
+        .add(id, "33", "a_s", "hello1", "a_i", "13", "a_f", "4")
+        .add(id, "34", "a_s", "hello1", "a_i", "13", "a_f", "4")
+        .add(id, "35", "a_s", "hello1", "a_i", "13", "a_f", "4")
+        .add(id, "36", "a_s", "hello1", "a_i", "13", "a_f", "4")
+        .add(id, "37", "a_s", "hello1", "a_i", "13", "a_f", "4")
+        .add(id, "38", "a_s", "hello1", "a_i", "13", "a_f", "4")
+        .add(id, "39", "a_s", "hello1", "a_i", "13", "a_f", "4")
+        .add(id, "40", "a_s", "hello1", "a_i", "13", "a_f", "4")
+        .add(id, "41", "a_s", "hello1", "a_i", "13", "a_f", "4")
+        .add(id, "42", "a_s", "hello1", "a_i", "13", "a_f", "4")
+        .add(id, "43", "a_s", "hello1", "a_i", "13", "a_f", "4")
+        .add(id, "44", "a_s", "hello1", "a_i", "13", "a_f", "4")
+        .add(id, "45", "a_s", "hello1", "a_i", "13", "a_f", "4")
+        .add(id, "46", "a_s", "hello1", "a_i", "13", "a_f", "4")
+        .add(id, "47", "a_s", "hello1", "a_i", "13", "a_f", "4")
+        .add(id, "48", "a_s", "hello1", "a_i", "13", "a_f", "4")
+        .add(id, "49", "a_s", "hello1", "a_i", "13", "a_f", "4")
+        .add(id, "50", "a_s", "hello1", "a_i", "13", "a_f", "4")
+        .add(id, "51", "a_s", "hello1", "a_i", "13", "a_f", "4")
+        .add(id, "52", "a_s", "hello1", "a_i", "13", "a_f", "4")
+        .add(id, "53", "a_s", "hello1", "a_i", "13", "a_f", "4")
+        .add(id, "54", "a_s", "hello1", "a_i", "13", "a_f", "4")
+        .add(id, "55", "a_s", "hello1", "a_i", "13", "a_f", "4")
+        .add(id, "56", "a_s", "hello1", "a_i", "13", "a_f", "1000")
+
+        .commit(cluster.getSolrClient(), COLLECTIONORALIAS);
+
+    StreamContext streamContext = new StreamContext();
+    SolrClientCache solrClientCache = new SolrClientCache();
+    streamContext.setSolrClientCache(solrClientCache);
+
+    String zkHost = cluster.getZkServer().getZkAddress();
+    StreamFactory streamFactory = new StreamFactory().withCollectionZkHost(COLLECTIONORALIAS, zkHost)
+        .withFunctionName("shuffle", ShuffleStream.class)
+        .withFunctionName("unique", UniqueStream.class)
+        .withFunctionName("parallel", ParallelStream.class);
+
+    try {
+      ParallelStream pstream = (ParallelStream) streamFactory.constructStream("parallel(" + COLLECTIONORALIAS + ", unique(shuffle(collection1, q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_i asc\", partitionKeys=\"a_f\"), over=\"a_f\"), workers=\"2\", zkHost=\"" + zkHost + "\", sort=\"a_f asc\")");
+      pstream.setStreamFactory(streamFactory);
+      pstream.setStreamContext(streamContext);
+      List<Tuple> tuples = getTuples(pstream);
+      assert (tuples.size() == 6);
+      assertOrder(tuples, 0, 1, 3, 4, 6, 56);
+
+      //Test the eofTuples
+
+      Map<String, Tuple> eofTuples = pstream.getEofTuples();
+      assert (eofTuples.size() == 2); //There should be an EOF tuple for each worker.
+      assert (pstream.toExpression(streamFactory).toString().contains("shuffle"));
+    } finally {
+      solrClientCache.close();
+    }
+  }
+
+  @Test
+  public void testParallelReducerStream() throws Exception {
+
+    new UpdateRequest()
+        .add(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "1")
+        .add(id, "2", "a_s", "hello0", "a_i", "2", "a_f", "2")
+        .add(id, "3", "a_s", "hello3", "a_i", "3", "a_f", "3")
+        .add(id, "4", "a_s", "hello4", "a_i", "4", "a_f", "4")
+        .add(id, "1", "a_s", "hello0", "a_i", "1", "a_f", "5")
+        .add(id, "5", "a_s", "hello3", "a_i", "10", "a_f", "6")
+        .add(id, "6", "a_s", "hello4", "a_i", "11", "a_f", "7")
+        .add(id, "7", "a_s", "hello3", "a_i", "12", "a_f", "8")
+        .add(id, "8", "a_s", "hello3", "a_i", "13", "a_f", "9")
+        .add(id, "9", "a_s", "hello0", "a_i", "14", "a_f", "10")
+        .commit(cluster.getSolrClient(), COLLECTIONORALIAS);
+
+    StreamContext streamContext = new StreamContext();
+    SolrClientCache solrClientCache = new SolrClientCache();
+    streamContext.setSolrClientCache(solrClientCache);
+
+
+    String zkHost = cluster.getZkServer().getZkAddress();
+    StreamFactory streamFactory = new StreamFactory().withCollectionZkHost(COLLECTIONORALIAS, zkHost)
+        .withFunctionName("search", CloudSolrStream.class)
+        .withFunctionName("group", GroupOperation.class)
+        .withFunctionName("reduce", ReducerStream.class)
+        .withFunctionName("parallel", ParallelStream.class);
+
+
+    try {
+      ParallelStream pstream = (ParallelStream) streamFactory.constructStream("parallel(" + COLLECTIONORALIAS + ", " +
+          "reduce(" +
+          "search(" + COLLECTIONORALIAS + ", q=\"*:*\", fl=\"id,a_s,a_i,a_f\", sort=\"a_s asc,a_f asc\", partitionKeys=\"a_s\"), " +
+          "by=\"a_s\"," +
+          "group(sort=\"a_i asc\", n=\"5\")), " +
+          "workers=\"2\", zkHost=\"" + zkHost + "\", sort=\"a_s asc\")");
+
+      pstream.setStreamContext(streamContext);
+
+      List<Tuple> tuples = getTuples(pstream);
+
+      assert (tuples.size() == 3);
+
+      Tuple t0 = tuples.get(0);
+      List<Map> maps0 = t0.getMaps("group");
+      assertMaps(maps0, 0, 1, 2, 9);
+
+      Tuple t1 = tuples.get(1);
+      List<Map> maps1 = t1.getMaps("group");
+      assertMaps(maps1, 3, 5, 7, 8);
+
+      Tuple t2 = tuples.get(2);
+      List<Map> maps2 = t2.getMaps("group");
+      assertMaps(maps2, 4, 6);
+
+
+      pstream = (ParallelStream) streamFactory.constructStream("parallel(" + COLLECTIONORALIAS + ", " +
+          "reduce(" +
+          "search(" + COLLECTIONORALIAS + ", q=\"*:*\", fl=\"id,a_s,a_i,a_f\", sort=\"a_s desc,a_f asc\", partitionKeys=\"a_s\"), " +
+          "by=\"a_s\", " +
+          "group(sort=\"a_i desc\", n=\"5\"))," +
+          "workers=\"2\", zkHost=\"" + zkHost + "\", sort=\"a_s desc\")");
+
+      pstream.setStreamContext(streamContext);
+      tuples = getTuples(pstream);
+
+      assert (tuples.size() == 3);
+
+      t0 = tuples.get(0);
+      maps0 = t0.getMaps("group");
+      assertMaps(maps0, 6, 4);
+
+
+      t1 = tuples.get(1);
+      maps1 = t1.getMaps("group");
+      assertMaps(maps1, 8, 7, 5, 3);
+
+
+      t2 = tuples.get(2);
+      maps2 = t2.getMaps("group");
+      assertMaps(maps2, 9, 2, 1, 0);
+    } finally {
+      solrClientCache.close();
+    }
+
+  }
+
+  @Test
+  public void testParallelRankStream() throws Exception {
+
+    new UpdateRequest()
+        .add(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "0")
+        .add(id, "2", "a_s", "hello2", "a_i", "2", "a_f", "0")
+        .add(id, "3", "a_s", "hello3", "a_i", "3", "a_f", "3")
+        .add(id, "4", "a_s", "hello4", "a_i", "4", "a_f", "4")
+        .add(id, "5", "a_s", "hello1", "a_i", "5", "a_f", "1")
+        .add(id, "6", "a_s", "hello1", "a_i", "6", "a_f", "1")
+        .add(id, "7", "a_s", "hello1", "a_i", "7", "a_f", "1")
+        .add(id, "8", "a_s", "hello1", "a_i", "8", "a_f", "1")
+        .add(id, "9", "a_s", "hello1", "a_i", "9", "a_f", "1")
+        .add(id, "10", "a_s", "hello1", "a_i", "10", "a_f", "1")
+        .commit(cluster.getSolrClient(), COLLECTIONORALIAS);
+
+    String zkHost = cluster.getZkServer().getZkAddress();
+    StreamFactory streamFactory = new StreamFactory().withCollectionZkHost(COLLECTIONORALIAS, zkHost)
+        .withFunctionName("search", CloudSolrStream.class)
+        .withFunctionName("unique", UniqueStream.class)
+        .withFunctionName("top", RankStream.class)
+        .withFunctionName("group", ReducerStream.class)
+        .withFunctionName("parallel", ParallelStream.class);
+
+    StreamContext streamContext = new StreamContext();
+    SolrClientCache solrClientCache = new SolrClientCache();
+    streamContext.setSolrClientCache(solrClientCache);
+    try {
+      ParallelStream pstream = (ParallelStream) streamFactory.constructStream("parallel("
+          + COLLECTIONORALIAS + ", "
+          + "top("
+          + "search(" + COLLECTIONORALIAS + ", q=\"*:*\", fl=\"id,a_s,a_i\", sort=\"a_i asc\", partitionKeys=\"a_i\"), "
+          + "n=\"11\", "
+          + "sort=\"a_i desc\"), workers=\"2\", zkHost=\"" + zkHost + "\", sort=\"a_i desc\")");
+      pstream.setStreamContext(streamContext);
+      List<Tuple> tuples = getTuples(pstream);
+
+      assert (tuples.size() == 10);
+      assertOrder(tuples, 10, 9, 8, 7, 6, 5, 4, 3, 2, 0);
+    } finally {
+      solrClientCache.close();
+    }
+
+  }
+
+  @Test
+  public void testParallelMergeStream() throws Exception {
+
+    new UpdateRequest()
+        .add(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "0")
+        .add(id, "2", "a_s", "hello2", "a_i", "2", "a_f", "0")
+        .add(id, "3", "a_s", "hello3", "a_i", "3", "a_f", "3")
+        .add(id, "4", "a_s", "hello4", "a_i", "4", "a_f", "4")
+        .add(id, "1", "a_s", "hello1", "a_i", "1", "a_f", "1")
+        .add(id, "5", "a_s", "hello0", "a_i", "10", "a_f", "0")
+        .add(id, "6", "a_s", "hello2", "a_i", "8", "a_f", "0")
+        .add(id, "7", "a_s", "hello3", "a_i", "7", "a_f", "3")
+        .add(id, "8", "a_s", "hello4", "a_i", "11", "a_f", "4")
+        .add(id, "9", "a_s", "hello1", "a_i", "100", "a_f", "1")
+        .commit(cluster.getSolrClient(), COLLECTIONORALIAS);
+
+    String zkHost = cluster.getZkServer().getZkAddress();
+    StreamFactory streamFactory = new StreamFactory().withCollectionZkHost(COLLECTIONORALIAS, zkHost)
+        .withFunctionName("search", CloudSolrStream.class)
+        .withFunctionName("unique", UniqueStream.class)
+        .withFunctionName("top", RankStream.class)
+        .withFunctionName("group", ReducerStream.class)
+        .withFunctionName("merge", MergeStream.class)
+        .withFunctionName("parallel", ParallelStream.class);
+
+    StreamContext streamContext = new StreamContext();
+    SolrClientCache solrClientCache = new SolrClientCache();
+    streamContext.setSolrClientCache(solrClientCache);
+    try {
+      //Test ascending
+      ParallelStream pstream = (ParallelStream) streamFactory.constructStream("parallel(" + COLLECTIONORALIAS + ", merge(search(" + COLLECTIONORALIAS + ", q=\"id:(4 1 8 7 9)\", fl=\"id,a_s,a_i\", sort=\"a_i asc\", partitionKeys=\"a_i\"), search(" + COLLECTIONORALIAS + ", q=\"id:(0 2 3 6)\", fl=\"id,a_s,a_i\", sort=\"a_i asc\", partitionKeys=\"a_i\"), on=\"a_i asc\"), workers=\"2\", zkHost=\"" + zkHost + "\", sort=\"a_i asc\")");
+      pstream.setStreamContext(streamContext);
+      List<Tuple> tuples = getTuples(pstream);
+
+      assert (tuples.size() == 9);
+      assertOrder(tuples, 0, 1, 2, 3, 4, 7, 6, 8, 9);
+
+      //Test descending
+
+      pstream = (ParallelStream) streamFactory.constructStream("parallel(" + COLLECTIONORALIAS + ", merge(search(" + COLLECTIONORALIAS + ", q=\"id:(4 1 8 9)\", fl=\"id,a_s,a_i\", sort=\"a_i desc\", partitionKeys=\"a_i\"), search(" + COLLECTIONORALIAS + ", q=\"id:(0 2 3 6)\", fl=\"id,a_s,a_i\", sort=\"a_i desc\", partitionKeys=\"a_i\"), on=\"a_i desc\"), workers=\"2\", zkHost=\"" + zkHost + "\", sort=\"a_i desc\")");
+      pstream.setStreamContext(streamContext);
+      tuples = getTuples(pstream);
+
+      assert (tuples.size() == 8);
+      assertOrder(tuples, 9, 8, 6, 4, 3, 2, 1, 0);
+    } finally {
+      solrClientCache.close();
+    }
+
+  }
+
+  @Test
+  public void testParallelRollupStream() throws Exception {
+
+    new UpdateRequest()
+        .add(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "1")
+        .add(id, "2", "a_s", "hello0", "a_i", "2", "a_f", "2")
+        .add(id, "3", "a_s", "hello3", "a_i", "3", "a_f", "3")
+        .add(id, "4", "a_s", "hello4", "a_i", "4", "a_f", "4")
+        .add(id, "1", "a_s", "hello0", "a_i", "1", "a_f", "5")
+        .add(id, "5", "a_s", "hello3", "a_i", "10", "a_f", "6")
+        .add(id, "6", "a_s", "hello4", "a_i", "11", "a_f", "7")
+        .add(id, "7", "a_s", "hello3", "a_i", "12", "a_f", "8")
+        .add(id, "8", "a_s", "hello3", "a_i", "13", "a_f", "9")
+        .add(id, "9", "a_s", "hello0", "a_i", "14", "a_f", "10")
+        .commit(cluster.getSolrClient(), COLLECTIONORALIAS);
+
+    StreamFactory factory = new StreamFactory()
+      .withCollectionZkHost(COLLECTIONORALIAS, cluster.getZkServer().getZkAddress())
+      .withFunctionName("search", CloudSolrStream.class)
+      .withFunctionName("parallel", ParallelStream.class)
+      .withFunctionName("rollup", RollupStream.class)
+      .withFunctionName("sum", SumMetric.class)
+      .withFunctionName("min", MinMetric.class)
+      .withFunctionName("max", MaxMetric.class)
+      .withFunctionName("avg", MeanMetric.class)
+      .withFunctionName("count", CountMetric.class);
+
+
+    StreamContext streamContext = new StreamContext();
+    SolrClientCache solrClientCache = new SolrClientCache();
+    streamContext.setSolrClientCache(solrClientCache);
+
+    StreamExpression expression;
+    TupleStream stream;
+    List<Tuple> tuples;
+
+    try {
+      expression = StreamExpressionParser.parse("parallel(" + COLLECTIONORALIAS + ","
+              + "rollup("
+              + "search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"a_s,a_i,a_f\", sort=\"a_s asc\", partitionKeys=\"a_s\"),"
+              + "over=\"a_s\","
+              + "sum(a_i),"
+              + "sum(a_f),"
+              + "min(a_i),"
+              + "min(a_f),"
+              + "max(a_i),"
+              + "max(a_f),"
+              + "avg(a_i),"
+              + "avg(a_f),"
+              + "count(*)"
+              + "),"
+              + "workers=\"2\", zkHost=\"" + cluster.getZkServer().getZkAddress() + "\", sort=\"a_s asc\")"
+      );
+
+
+      stream = factory.constructStream(expression);
+      stream.setStreamContext(streamContext);
+      tuples = getTuples(stream);
+
+      assert (tuples.size() == 3);
+
+      //Test Long and Double Sums
+
+      Tuple tuple = tuples.get(0);
+      String bucket = tuple.getString("a_s");
+      Double sumi = tuple.getDouble("sum(a_i)");
+      Double sumf = tuple.getDouble("sum(a_f)");
+      Double mini = tuple.getDouble("min(a_i)");
+      Double minf = tuple.getDouble("min(a_f)");
+      Double maxi = tuple.getDouble("max(a_i)");
+      Double maxf = tuple.getDouble("max(a_f)");
+      Double avgi = tuple.getDouble("avg(a_i)");
+      Double avgf = tuple.getDouble("avg(a_f)");
+      Double count = tuple.getDouble("count(*)");
+
+      assertTrue(bucket.equals("hello0"));
+      assertTrue(sumi.doubleValue() == 17.0D);
+      assertTrue(sumf.doubleValue() == 18.0D);
+      assertTrue(mini.doubleValue() == 0.0D);
+      assertTrue(minf.doubleValue() == 1.0D);
+      assertTrue(maxi.doubleValue() == 14.0D);
+      assertTrue(maxf.doubleValue() == 10.0D);
+      assertTrue(avgi.doubleValue() == 4.25D);
+      assertTrue(avgf.doubleValue() == 4.5D);
+      assertTrue(count.doubleValue() == 4);
+
+      tuple = tuples.get(1);
+      bucket = tuple.getString("a_s");
+      sumi = tuple.getDouble("sum(a_i)");
+      sumf = tuple.getDouble("sum(a_f)");
+      mini = tuple.getDouble("min(a_i)");
+      minf = tuple.getDouble("min(a_f)");
+      maxi = tuple.getDouble("max(a_i)");
+      maxf = tuple.getDouble("max(a_f)");
+      avgi = tuple.getDouble("avg(a_i)");
+      avgf = tuple.getDouble("avg(a_f)");
+      count = tuple.getDouble("count(*)");
+
+      assertTrue(bucket.equals("hello3"));
+      assertTrue(sumi.doubleValue() == 38.0D);
+      assertTrue(sumf.doubleValue() == 26.0D);
+      assertTrue(mini.doubleValue() == 3.0D);
+      assertTrue(minf.doubleValue() == 3.0D);
+      assertTrue(maxi.doubleValue() == 13.0D);
+      assertTrue(maxf.doubleValue() == 9.0D);
+      assertTrue(avgi.doubleValue() == 9.5D);
+      assertTrue(avgf.doubleValue() == 6.5D);
+      assertTrue(count.doubleValue() == 4);
+
+      tuple = tuples.get(2);
+      bucket = tuple.getString("a_s");
+      sumi = tuple.getDouble("sum(a_i)");
+      sumf = tuple.getDouble("sum(a_f)");
+      mini = tuple.getDouble("min(a_i)");
+      minf = tuple.getDouble("min(a_f)");
+      maxi = tuple.getDouble("max(a_i)");
+      maxf = tuple.getDouble("max(a_f)");
+      avgi = tuple.getDouble("avg(a_i)");
+      avgf = tuple.getDouble("avg(a_f)");
+      count = tuple.getDouble("count(*)");
+
+      assertTrue(bucket.equals("hello4"));
+      assertTrue(sumi.longValue() == 15);
+      assertTrue(sumf.doubleValue() == 11.0D);
+      assertTrue(mini.doubleValue() == 4.0D);
+      assertTrue(minf.doubleValue() == 4.0D);
+      assertTrue(maxi.doubleValue() == 11.0D);
+      assertTrue(maxf.doubleValue() == 7.0D);
+      assertTrue(avgi.doubleValue() == 7.5D);
+      assertTrue(avgf.doubleValue() == 5.5D);
+      assertTrue(count.doubleValue() == 2);
+    } finally {
+      solrClientCache.close();
+    }
+  }
+
+  @Test
+  public void testInnerJoinStream() throws Exception {
+
+    new UpdateRequest()
+        .add(id, "1", "side_s", "left", "join1_i", "0", "join2_s", "a", "ident_s", "left_1") // 8, 9
+        .add(id, "15", "side_s", "left", "join1_i", "0", "join2_s", "a", "ident_s", "left_1") // 8, 9
+        .add(id, "2", "side_s", "left", "join1_i", "0", "join2_s", "b", "ident_s", "left_2")
+        .add(id, "3", "side_s", "left", "join1_i", "1", "join2_s", "a", "ident_s", "left_3") // 10
+        .add(id, "4", "side_s", "left", "join1_i", "1", "join2_s", "b", "ident_s", "left_4") // 11
+        .add(id, "5", "side_s", "left", "join1_i", "1", "join2_s", "c", "ident_s", "left_5") // 12
+        .add(id, "6", "side_s", "left", "join1_i", "2", "join2_s", "d", "ident_s", "left_6")
+        .add(id, "7", "side_s", "left", "join1_i", "3", "join2_s", "e", "ident_s", "left_7") // 14
+
+        .add(id, "8", "side_s", "right", "join1_i", "0", "join2_s", "a", "ident_s", "right_1", "join3_i", "0") // 1,15
+        .add(id, "9", "side_s", "right", "join1_i", "0", "join2_s", "a", "ident_s", "right_2", "join3_i", "0") // 1,15
+        .add(id, "10", "side_s", "right", "join1_i", "1", "join2_s", "a", "ident_s", "right_3", "join3_i", "1") // 3
+        .add(id, "11", "side_s", "right", "join1_i", "1", "join2_s", "b", "ident_s", "right_4", "join3_i", "1") // 4
+        .add(id, "12", "side_s", "right", "join1_i", "1", "join2_s", "c", "ident_s", "right_5", "join3_i", "1") // 5
+        .add(id, "13", "side_s", "right", "join1_i", "2", "join2_s", "dad", "ident_s", "right_6", "join3_i", "2")
+        .add(id, "14", "side_s", "right", "join1_i", "3", "join2_s", "e", "ident_s", "right_7", "join3_i", "3") // 7
+        .commit(cluster.getSolrClient(), COLLECTIONORALIAS);
+
+    StreamExpression expression;
+    TupleStream stream;
+    List<Tuple> tuples;
+    StreamContext streamContext = new StreamContext();
+    SolrClientCache solrClientCache = new SolrClientCache();
+    streamContext.setSolrClientCache(solrClientCache);
+    
+    StreamFactory factory = new StreamFactory()
+      .withCollectionZkHost(COLLECTIONORALIAS, cluster.getZkServer().getZkAddress())
+      .withFunctionName("search", CloudSolrStream.class)
+      .withFunctionName("innerJoin", InnerJoinStream.class);
+
+    try {
+      // Basic test
+      expression = StreamExpressionParser.parse("innerJoin("
+          + "search(" + COLLECTIONORALIAS + ", q=\"side_s:left\", fl=\"id,join1_i,join2_s,ident_s\", sort=\"join1_i asc, join2_s asc, id asc\"),"
+          + "search(" + COLLECTIONORALIAS + ", q=\"side_s:right\", fl=\"join1_i,join2_s,ident_s\", sort=\"join1_i asc, join2_s asc\"),"
+          + "on=\"join1_i=join1_i, join2_s=join2_s\")");
+      stream = new InnerJoinStream(expression, factory);
+      stream.setStreamContext(streamContext);
+      tuples = getTuples(stream);
+      assert (tuples.size() == 8);
+      assertOrder(tuples, 1, 1, 15, 15, 3, 4, 5, 7);
+
+      // Basic desc
+      expression = StreamExpressionParser.parse("innerJoin("
+          + "search(" + COLLECTIONORALIAS + ", q=\"side_s:left\", fl=\"id,join1_i,join2_s,ident_s\", sort=\"join1_i desc, join2_s asc\"),"
+          + "search(" + COLLECTIONORALIAS + ", q=\"side_s:right\", fl=\"join1_i,join2_s,ident_s\", sort=\"join1_i desc, join2_s asc\"),"
+          + "on=\"join1_i=join1_i, join2_s=join2_s\")");
+      stream = new InnerJoinStream(expression, factory);
+      stream.setStreamContext(streamContext);
+      tuples = getTuples(stream);
+      assert (tuples.size() == 8);
+      assertOrder(tuples, 7, 3, 4, 5, 1, 1, 15, 15);
+
+      // Results in both searches, no join matches
+      expression = StreamExpressionParser.parse("innerJoin("
+          + "search(" + COLLECTIONORALIAS + ", q=\"side_s:left\", fl=\"id,join1_i,join2_s,ident_s\", sort=\"ident_s asc\"),"
+          + "search(" + COLLECTIONORALIAS + ", q=\"side_s:right\", fl=\"id,join1_i,join2_s,ident_s\", sort=\"ident_s asc\", aliases=\"id=right.id, join1_i=right.join1_i, join2_s=right.join2_s, ident_s=right.ident_s\"),"
+          + "on=\"ident_s=right.ident_s\")");
+      stream = new InnerJoinStream(expression, factory);
+      stream.setStreamContext(streamContext);
+      tuples = getTuples(stream);
+      assert (tuples.size() == 0);
+
+      // Differing field names
+      expression = StreamExpressionParser.parse("innerJoin("
+          + "search(" + COLLECTIONORALIAS + ", q=\"side_s:left\", fl=\"id,join1_i,join2_s,ident_s\", sort=\"join1_i asc, join2_s asc, id asc\"),"
+          + "search(" + COLLECTIONORALIAS + ", q=\"side_s:right\", fl=\"join3_i,join2_s,ident_s\", sort=\"join3_i asc, join2_s asc\", aliases=\"join3_i=aliasesField\"),"
+          + "on=\"join1_i=aliasesField, join2_s=join2_s\")");
+      stream = new InnerJoinStream(expression, factory);
+      stream.setStreamContext(streamContext);
+      tuples = getTuples(stream);
+
+      assert (tuples.size() == 8);
+      assertOrder(tuples, 1, 1, 15, 15, 3, 4, 5, 7);
+    } finally {
+      solrClientCache.close();
+    }
+  }
+
+  @Test
+  public void testLeftOuterJoinStream() throws Exception {
+
+    new UpdateRequest()
+        .add(id, "1", "side_s", "left", "join1_i", "0", "join2_s", "a", "ident_s", "left_1") // 8, 9
+        .add(id, "15", "side_s", "left", "join1_i", "0", "join2_s", "a", "ident_s", "left_1") // 8, 9
+        .add(id, "2", "side_s", "left", "join1_i", "0", "join2_s", "b", "ident_s", "left_2")
+        .add(id, "3", "side_s", "left", "join1_i", "1", "join2_s", "a", "ident_s", "left_3") // 10
+        .add(id, "4", "side_s", "left", "join1_i", "1", "join2_s", "b", "ident_s", "left_4") // 11
+        .add(id, "5", "side_s", "left", "join1_i", "1", "join2_s", "c", "ident_s", "left_5") // 12
+        .add(id, "6", "side_s", "left", "join1_i", "2", "join2_s", "d", "ident_s", "left_6")
+        .add(id, "7", "side_s", "left", "join1_i", "3", "join2_s", "e", "ident_s", "left_7") // 14
+
+        .add(id, "8", "side_s", "right", "join1_i", "0", "join2_s", "a", "ident_s", "right_1", "join3_i", "0") // 1,15
+        .add(id, "9", "side_s", "right", "join1_i", "0", "join2_s", "a", "ident_s", "right_2", "join3_i", "0") // 1,15
+        .add(id, "10", "side_s", "right", "join1_i", "1", "join2_s", "a", "ident_s", "right_3", "join3_i", "1") // 3
+        .add(id, "11", "side_s", "right", "join1_i", "1", "join2_s", "b", "ident_s", "right_4", "join3_i", "1") // 4
+        .add(id, "12", "side_s", "right", "join1_i", "1", "join2_s", "c", "ident_s", "right_5", "join3_i", "1") // 5
+        .add(id, "13", "side_s", "right", "join1_i", "2", "join2_s", "dad", "ident_s", "right_6", "join3_i", "2")
+        .add(id, "14", "side_s", "right", "join1_i", "3", "join2_s", "e", "ident_s", "right_7", "join3_i", "3") // 7
+        .commit(cluster.getSolrClient(), COLLECTIONORALIAS);
+
+    StreamExpression expression;
+    TupleStream stream;
+    List<Tuple> tuples;
+    StreamContext streamContext = new StreamContext();
+    SolrClientCache solrClientCache = new SolrClientCache();
+    streamContext.setSolrClientCache(solrClientCache);
+    
+    StreamFactory factory = new StreamFactory()
+      .withCollectionZkHost(COLLECTIONORALIAS, cluster.getZkServer().getZkAddress())
+      .withFunctionName("search", CloudSolrStream.class)
+      .withFunctionName("leftOuterJoin", LeftOuterJoinStream.class);
+    
+    // Basic test
+    try {
+      expression = StreamExpressionParser.parse("leftOuterJoin("
+          + "search(" + COLLECTIONORALIAS + ", q=\"side_s:left\", fl=\"id,join1_i,join2_s,ident_s\", sort=\"join1_i asc, join2_s asc, id asc\"),"
+          + "search(" + COLLECTIONORALIAS + ", q=\"side_s:right\", fl=\"join1_i,join2_s,ident_s\", sort=\"join1_i asc, join2_s asc\"),"
+          + "on=\"join1_i=join1_i, join2_s=join2_s\")");
+      stream = new LeftOuterJoinStream(expression, factory);
+      stream.setStreamContext(streamContext);
+      tuples = getTuples(stream);
+      assert (tuples.size() == 10);
+      assertOrder(tuples, 1, 1, 15, 15, 2, 3, 4, 5, 6, 7);
+
+      // Basic desc
+      expression = StreamExpressionParser.parse("leftOuterJoin("
+          + "search(" + COLLECTIONORALIAS + ", q=\"side_s:left\", fl=\"id,join1_i,join2_s,ident_s\", sort=\"join1_i desc, join2_s asc\"),"
+          + "search(" + COLLECTIONORALIAS + ", q=\"side_s:right\", fl=\"join1_i,join2_s,ident_s\", sort=\"join1_i desc, join2_s asc\"),"
+          + "on=\"join1_i=join1_i, join2_s=join2_s\")");
+      stream = new LeftOuterJoinStream(expression, factory);
+      stream.setStreamContext(streamContext);
+      tuples = getTuples(stream);
+      assert (tuples.size() == 10);
+      assertOrder(tuples, 7, 6, 3, 4, 5, 1, 1, 15, 15, 2);
+
+      // Results in both searches, no join matches
+      expression = StreamExpressionParser.parse("leftOuterJoin("
+          + "search(" + COLLECTIONORALIAS + ", q=\"side_s:left\", fl=\"id,join1_i,join2_s,ident_s\", sort=\"ident_s asc\"),"
+          + "search(" + COLLECTIONORALIAS + ", q=\"side_s:right\", fl=\"id,join1_i,join2_s,ident_s\", sort=\"ident_s asc\", aliases=\"id=right.id, join1_i=right.join1_i, join2_s=right.join2_s, ident_s=right.ident_s\"),"
+          + "on=\"ident_s=right.ident_s\")");
+      stream = new LeftOuterJoinStream(expression, factory);
+      stream.setStreamContext(streamContext);
+      tuples = getTuples(stream);
+      assert (tuples.size() == 8);
+      assertOrder(tuples, 1, 15, 2, 3, 4, 5, 6, 7);
+
+      // Differing field names
+      expression = StreamExpressionParser.parse("leftOuterJoin("
+          + "search(" + COLLECTIONORALIAS + ", q=\"side_s:left\", fl=\"id,join1_i,join2_s,ident_s\", sort=\"join1_i asc, join2_s asc, id asc\"),"
+          + "search(" + COLLECTIONORALIAS + ", q=\"side_s:right\", fl=\"join3_i,join2_s,ident_s\", sort=\"join3_i asc, join2_s asc\", aliases=\"join3_i=aliasesField\"),"
+          + "on=\"join1_i=aliasesField, join2_s=join2_s\")");
+      stream = new LeftOuterJoinStream(expression, factory);
+      stream.setStreamContext(streamContext);
+      tuples = getTuples(stream);
+      assert (tuples.size() == 10);
+      assertOrder(tuples, 1, 1, 15, 15, 2, 3, 4, 5, 6, 7);
+    } finally {
+      solrClientCache.close();
+    }
+  }
+
+  @Test
+  public void testHashJoinStream() throws Exception {
+
+    new UpdateRequest()
+        .add(id, "1", "side_s", "left", "join1_i", "0", "join2_s", "a", "ident_s", "left_1") // 8, 9
+        .add(id, "15", "side_s", "left", "join1_i", "0", "join2_s", "a", "ident_s", "left_1") // 8, 9
+        .add(id, "2", "side_s", "left", "join1_i", "0", "join2_s", "b", "ident_s", "left_2")
+        .add(id, "3", "side_s", "left", "join1_i", "1", "join2_s", "a", "ident_s", "left_3") // 10
+        .add(id, "4", "side_s", "left", "join1_i", "1", "join2_s", "b", "ident_s", "left_4") // 11
+        .add(id, "5", "side_s", "left", "join1_i", "1", "join2_s", "c", "ident_s", "left_5") // 12
+        .add(id, "6", "side_s", "left", "join1_i", "2", "join2_s", "d", "ident_s", "left_6")
+        .add(id, "7", "side_s", "left", "join1_i", "3", "join2_s", "e", "ident_s", "left_7") // 14
+
+        .add(id, "8", "side_s", "right", "join1_i", "0", "join2_s", "a", "ident_s", "right_1", "join3_i", "0") // 1,15
+        .add(id, "9", "side_s", "right", "join1_i", "0", "join2_s", "a", "ident_s", "right_2", "join3_i", "0") // 1,15
+        .add(id, "10", "side_s", "right", "join1_i", "1", "join2_s", "a", "ident_s", "right_3", "join3_i", "1") // 3
+        .add(id, "11", "side_s", "right", "join1_i", "1", "join2_s", "b", "ident_s", "right_4", "join3_i", "1") // 4
+        .add(id, "12", "side_s", "right", "join1_i", "1", "join2_s", "c", "ident_s", "right_5", "join3_i", "1") // 5
+        .add(id, "13", "side_s", "right", "join1_i", "2", "join2_s", "dad", "ident_s", "right_6", "join3_i", "2")
+        .add(id, "14", "side_s", "right", "join1_i", "3", "join2_s", "e", "ident_s", "right_7", "join3_i", "3") // 7
+        .commit(cluster.getSolrClient(), COLLECTIONORALIAS);
+
+    StreamExpression expression;
+    TupleStream stream;
+    List<Tuple> tuples;
+    StreamContext streamContext = new StreamContext();
+    SolrClientCache solrClientCache = new SolrClientCache();
+    streamContext.setSolrClientCache(solrClientCache);
+    
+    StreamFactory factory = new StreamFactory()
+      .withCollectionZkHost(COLLECTIONORALIAS, cluster.getZkServer().getZkAddress())
+      .withFunctionName("search", CloudSolrStream.class)
+      .withFunctionName("hashJoin", HashJoinStream.class);
+    try {
+      // Basic test
+      expression = StreamExpressionParser.parse("hashJoin("
+          + "search(collection1, q=\"side_s:left\", fl=\"id,join1_i,join2_s,ident_s\", sort=\"join1_i asc, join2_s asc, id asc\"),"
+          + "hashed=search(collection1, q=\"side_s:right\", fl=\"join1_i,join2_s,ident_s\", sort=\"join1_i asc, join2_s asc\"),"
+          + "on=\"join1_i, join2_s\")");
+      stream = new HashJoinStream(expression, factory);
+      stream.setStreamContext(streamContext);
+      tuples = getTuples(stream);
+      assert (tuples.size() == 8);
+      assertOrder(tuples, 1, 1, 15, 15, 3, 4, 5, 7);
+
+      // Basic desc
+      expression = StreamExpressionParser.parse("hashJoin("
+          + "search(collection1, q=\"side_s:left\", fl=\"id,join1_i,join2_s,ident_s\", sort=\"join1_i desc, join2_s asc\"),"
+          + "hashed=search(collection1, q=\"side_s:right\", fl=\"join1_i,join2_s,ident_s\", sort=\"join1_i desc, join2_s asc\"),"
+          + "on=\"join1_i, join2_s\")");
+      stream = new HashJoinStream(expression, factory);
+      stream.setStreamContext(streamContext);
+      tuples = getTuples

<TRUNCATED>