You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by jb...@apache.org on 2017/04/11 19:36:17 UTC

[1/4] lucene-solr:master: SOLR-10274: The search Streaming Expression should work in non-SolrCloud mode

Repository: lucene-solr
Updated Branches:
  refs/heads/master 57c583718 -> 5ebd41d13


http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/06a55b73/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamingTest.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamingTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamingTest.java
index 2f2273e..0de3aa0 100644
--- a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamingTest.java
+++ b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamingTest.java
@@ -131,13 +131,20 @@ public void testUniqueStream() throws Exception {
       .add(id, "1", "a_s", "hello1", "a_i", "1", "a_f", "1")
       .commit(cluster.getSolrClient(), COLLECTIONORALIAS);
 
-  SolrParams sParams = StreamingTest.mapParams("q", "*:*", "fl", "id,a_s,a_i,a_f", "sort", "a_f asc,a_i asc");
-  CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParams);
-  UniqueStream ustream = new UniqueStream(stream, new FieldEqualitor("a_f"));
-  List<Tuple> tuples = getTuples(ustream);
-  assertEquals(4, tuples.size());
-  assertOrder(tuples, 0,1,3,4);
-
+  StreamContext streamContext = new StreamContext();
+  SolrClientCache solrClientCache = new SolrClientCache();
+  streamContext.setSolrClientCache(solrClientCache);
+  try {
+    SolrParams sParams = StreamingTest.mapParams("q", "*:*", "fl", "id,a_s,a_i,a_f", "sort", "a_f asc,a_i asc");
+    CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParams);
+    UniqueStream ustream = new UniqueStream(stream, new FieldEqualitor("a_f"));
+    ustream.setStreamContext(streamContext);
+    List<Tuple> tuples = getTuples(ustream);
+    assertEquals(4, tuples.size());
+    assertOrder(tuples, 0, 1, 3, 4);
+  } finally {
+    solrClientCache.close();
+  }
 }
 
 @Test
@@ -167,15 +174,22 @@ public void testNonePartitionKeys() throws Exception {
       .add(id, "8", "a_s", "hello3", "a_i", "13", "a_f", "9")
       .add(id, "9", "a_s", "hello0", "a_i", "14", "a_f", "10")
       .commit(cluster.getSolrClient(), COLLECTIONORALIAS);
+  StreamContext streamContext = new StreamContext();
+  SolrClientCache solrClientCache = new SolrClientCache();
+  streamContext.setSolrClientCache(solrClientCache);
+  try {
 
-  SolrParams sParamsA = StreamingTest.mapParams("q", "*:*", "fl", "id,a_s,a_i,a_f", "sort", "a_s asc,a_f asc", "partitionKeys", "none");
-  CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParamsA);
-  ParallelStream pstream = parallelStream(stream, new FieldComparator("a_s", ComparatorOrder.ASCENDING));
-  attachStreamFactory(pstream);
-  List<Tuple> tuples = getTuples(pstream);
-
-  assert(tuples.size() == (10 * numWorkers)); // Each tuple will be double counted.
+    SolrParams sParamsA = StreamingTest.mapParams("q", "*:*", "fl", "id,a_s,a_i,a_f", "sort", "a_s asc,a_f asc", "partitionKeys", "none");
+    CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParamsA);
+    ParallelStream pstream = parallelStream(stream, new FieldComparator("a_s", ComparatorOrder.ASCENDING));
+    attachStreamFactory(pstream);
+    pstream.setStreamContext(streamContext);
+    List<Tuple> tuples = getTuples(pstream);
 
+    assert (tuples.size() == (10 * numWorkers)); // Each tuple will be double counted.
+  } finally {
+    solrClientCache.close();
+  }
 }
 
 @Test
@@ -193,19 +207,29 @@ public void testParallelUniqueStream() throws Exception {
       .add(id, "8", "a_s", "hello1", "a_i", "13", "a_f", "4")
       .commit(cluster.getSolrClient(), COLLECTIONORALIAS);
 
-  SolrParams sParams = mapParams("q", "*:*", "fl", "id,a_s,a_i,a_f", "sort", "a_f asc,a_i asc", "partitionKeys", "a_f");
-  CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParams);
-  UniqueStream ustream = new UniqueStream(stream, new FieldEqualitor("a_f"));
-  ParallelStream pstream = parallelStream(ustream, new FieldComparator("a_f", ComparatorOrder.ASCENDING));
-  attachStreamFactory(pstream);
-  List<Tuple> tuples = getTuples(pstream);
-  assertEquals(5, tuples.size());
-  assertOrder(tuples, 0, 1, 3, 4, 6);
+  StreamContext streamContext = new StreamContext();
+  SolrClientCache solrClientCache = new SolrClientCache();
+  streamContext.setSolrClientCache(solrClientCache);
+
+  try {
 
-  //Test the eofTuples
+    SolrParams sParams = mapParams("q", "*:*", "fl", "id,a_s,a_i,a_f", "sort", "a_f asc,a_i asc", "partitionKeys", "a_f");
+    CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParams);
+    UniqueStream ustream = new UniqueStream(stream, new FieldEqualitor("a_f"));
+    ParallelStream pstream = parallelStream(ustream, new FieldComparator("a_f", ComparatorOrder.ASCENDING));
+    attachStreamFactory(pstream);
+    pstream.setStreamContext(streamContext);
+    List<Tuple> tuples = getTuples(pstream);
+    assertEquals(5, tuples.size());
+    assertOrder(tuples, 0, 1, 3, 4, 6);
 
-  Map<String,Tuple> eofTuples = pstream.getEofTuples();
-  assertEquals(numWorkers, eofTuples.size()); //There should be an EOF tuple for each worker.
+    //Test the eofTuples
+
+    Map<String, Tuple> eofTuples = pstream.getEofTuples();
+    assertEquals(numWorkers, eofTuples.size()); //There should be an EOF tuple for each worker.
+  }finally {
+    solrClientCache.close();
+  }
 
 }
 
@@ -226,12 +250,21 @@ public void testMultipleFqClauses() throws Exception {
 
   streamFactory.withCollectionZkHost(COLLECTIONORALIAS, zkHost);
 
-  ModifiableSolrParams params = new ModifiableSolrParams(mapParams("q", "*:*", "fl", "id,a_i", 
-      "sort", "a_i asc", "fq", "a_ss:hello0", "fq", "a_ss:hello1"));
-  CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTIONORALIAS, params);
-  List<Tuple> tuples = getTuples(stream);
-  assertEquals("Multiple fq clauses should have been honored", 1, tuples.size());
-  assertEquals("should only have gotten back document 0", "0", tuples.get(0).getString("id"));
+  StreamContext streamContext = new StreamContext();
+  SolrClientCache solrClientCache = new SolrClientCache();
+  streamContext.setSolrClientCache(solrClientCache);
+
+  try {
+    ModifiableSolrParams params = new ModifiableSolrParams(mapParams("q", "*:*", "fl", "id,a_i",
+        "sort", "a_i asc", "fq", "a_ss:hello0", "fq", "a_ss:hello1"));
+    CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTIONORALIAS, params);
+    stream.setStreamContext(streamContext);
+    List<Tuple> tuples = getTuples(stream);
+    assertEquals("Multiple fq clauses should have been honored", 1, tuples.size());
+    assertEquals("should only have gotten back document 0", "0", tuples.get(0).getString("id"));
+  } finally {
+    solrClientCache.close();
+  }
 }
 
 @Test
@@ -245,15 +278,20 @@ public void testRankStream() throws Exception {
       .add(id, "1", "a_s", "hello1", "a_i", "1", "a_f", "1")
       .commit(cluster.getSolrClient(), COLLECTIONORALIAS);
 
-
-  SolrParams sParams = mapParams("q", "*:*", "fl", "id,a_s,a_i", "sort", "a_i asc");
-  CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParams);
-  RankStream rstream = new RankStream(stream, 3, new FieldComparator("a_i",ComparatorOrder.DESCENDING));
-  List<Tuple> tuples = getTuples(rstream);
-
-  assertEquals(3, tuples.size());
-  assertOrder(tuples, 4,3,2);
-
+  StreamContext streamContext = new StreamContext();
+  SolrClientCache solrClientCache = new SolrClientCache();
+  streamContext.setSolrClientCache(solrClientCache);
+  try {
+    SolrParams sParams = mapParams("q", "*:*", "fl", "id,a_s,a_i", "sort", "a_i asc");
+    CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParams);
+    RankStream rstream = new RankStream(stream, 3, new FieldComparator("a_i", ComparatorOrder.DESCENDING));
+    rstream.setStreamContext(streamContext);
+    List<Tuple> tuples = getTuples(rstream);
+    assertEquals(3, tuples.size());
+    assertOrder(tuples, 4, 3, 2);
+  } finally {
+    solrClientCache.close();
+  }
 }
 
 @Test
@@ -272,22 +310,30 @@ public void testParallelRankStream() throws Exception {
       .add(id, "10", "a_s", "hello1", "a_i", "10", "a_f", "1")
       .commit(cluster.getSolrClient(), COLLECTIONORALIAS);
 
-  SolrParams sParams = mapParams("q", "*:*", "fl", "id,a_s,a_i", "sort", "a_i asc", "partitionKeys", "a_i");
-  CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParams);
-  RankStream rstream = new RankStream(stream, 11, new FieldComparator("a_i",ComparatorOrder.DESCENDING));
-  ParallelStream pstream = parallelStream(rstream, new FieldComparator("a_i", ComparatorOrder.DESCENDING));    
-  attachStreamFactory(pstream);
-  List<Tuple> tuples = getTuples(pstream);
+  StreamContext streamContext = new StreamContext();
+  SolrClientCache solrClientCache = new SolrClientCache();
+  streamContext.setSolrClientCache(solrClientCache);
+  try {
+    SolrParams sParams = mapParams("q", "*:*", "fl", "id,a_s,a_i", "sort", "a_i asc", "partitionKeys", "a_i");
+    CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParams);
+    RankStream rstream = new RankStream(stream, 11, new FieldComparator("a_i", ComparatorOrder.DESCENDING));
+    ParallelStream pstream = parallelStream(rstream, new FieldComparator("a_i", ComparatorOrder.DESCENDING));
+    attachStreamFactory(pstream);
+    pstream.setStreamContext(streamContext);
+    List<Tuple> tuples = getTuples(pstream);
 
-  assertEquals(10, tuples.size());
-  assertOrder(tuples, 10,9,8,7,6,5,4,3,2,0);
+    assertEquals(10, tuples.size());
+    assertOrder(tuples, 10, 9, 8, 7, 6, 5, 4, 3, 2, 0);
+  } finally {
+    solrClientCache.close();
+  }
 
 }
 
 @Test
-public void testTrace() throws Exception {
+  public void testTrace() throws Exception {
 
-  new UpdateRequest()
+    new UpdateRequest()
       .add(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "1")
       .add(id, "2", "a_s", "hello0", "a_i", "2", "a_f", "2")
       .add(id, "3", "a_s", "hello3", "a_i", "3", "a_f", "3")
@@ -300,15 +346,24 @@ public void testTrace() throws Exception {
       .add(id, "9", "a_s", "hello0", "a_i", "14", "a_f", "10")
       .commit(cluster.getSolrClient(), COLLECTIONORALIAS);
 
-  //Test with spaces in the parameter lists.
-  SolrParams sParamsA = mapParams("q", "*:*", "fl", "id,a_s, a_i,a_f", "sort", "a_s asc,a_f   asc");
-  CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParamsA);
-  stream.setTrace(true);
-  List<Tuple> tuples = getTuples(stream);
-    assertEquals(COLLECTIONORALIAS, tuples.get(0).get("_COLLECTION_"));
-    assertEquals(COLLECTIONORALIAS, tuples.get(1).get("_COLLECTION_"));
-    assertEquals(COLLECTIONORALIAS, tuples.get(2).get("_COLLECTION_"));
-    assertEquals(COLLECTIONORALIAS, tuples.get(3).get("_COLLECTION_"));
+    StreamContext streamContext = new StreamContext();
+    SolrClientCache solrClientCache = new SolrClientCache();
+    streamContext.setSolrClientCache(solrClientCache);
+
+    try {
+      //Test with spaces in the parameter lists.
+      SolrParams sParamsA = mapParams("q", "*:*", "fl", "id,a_s, a_i,a_f", "sort", "a_s asc,a_f   asc");
+      CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParamsA);
+      stream.setTrace(true);
+      stream.setStreamContext(streamContext);
+      List<Tuple> tuples = getTuples(stream);
+      assertEquals(COLLECTIONORALIAS, tuples.get(0).get("_COLLECTION_"));
+      assertEquals(COLLECTIONORALIAS, tuples.get(1).get("_COLLECTION_"));
+      assertEquals(COLLECTIONORALIAS, tuples.get(2).get("_COLLECTION_"));
+      assertEquals(COLLECTIONORALIAS, tuples.get(3).get("_COLLECTION_"));
+    } finally {
+      solrClientCache.close();
+    }
   }
 
   @Test
@@ -327,52 +382,60 @@ public void testTrace() throws Exception {
         .add(id, "9", "a_s", "hello0", "a_i", "14", "a_f", "10")
         .commit(cluster.getSolrClient(), COLLECTIONORALIAS);
 
-    //Test with spaces in the parameter lists.
-    SolrParams sParamsA = mapParams("q", "*:*", "fl", "id,a_s, a_i,  a_f", "sort", "a_s asc  ,  a_f   asc");
-    CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParamsA);
-    ReducerStream rstream  = new ReducerStream(stream,
-                                               new FieldEqualitor("a_s"),
-                                               new GroupOperation(new FieldComparator("a_f", ComparatorOrder.ASCENDING), 5));
-
-    List<Tuple> tuples = getTuples(rstream);
-
-    assertEquals(3, tuples.size());
-
-    Tuple t0 = tuples.get(0);
-    List<Map> maps0 = t0.getMaps("group");
-    assertMaps(maps0, 0, 2, 1, 9);
-
-    Tuple t1 = tuples.get(1);
-    List<Map> maps1 = t1.getMaps("group");
-    assertMaps(maps1, 3, 5, 7, 8);
-
-    Tuple t2 = tuples.get(2);
-    List<Map> maps2 = t2.getMaps("group");
-    assertMaps(maps2, 4, 6);
-
-    //Test with spaces in the parameter lists using a comparator
-    sParamsA = mapParams("q", "*:*", "fl", "id,a_s, a_i,  a_f", "sort", "a_s asc  ,  a_f   asc");
-    stream = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParamsA);
-    rstream = new ReducerStream(stream,
-                                new FieldComparator("a_s", ComparatorOrder.ASCENDING),
-                                new GroupOperation(new FieldComparator("a_f", ComparatorOrder.DESCENDING), 5));
-
-    tuples = getTuples(rstream);
-
-    assertEquals(3, tuples.size());
-
-    t0 = tuples.get(0);
-    maps0 = t0.getMaps("group");
-    assertMaps(maps0, 9, 1, 2, 0);
-
-    t1 = tuples.get(1);
-    maps1 = t1.getMaps("group");
-    assertMaps(maps1, 8, 7, 5, 3);
-
-    t2 = tuples.get(2);
-    maps2 = t2.getMaps("group");
-    assertMaps(maps2, 6, 4);
-
+    StreamContext streamContext = new StreamContext();
+    SolrClientCache solrClientCache = new SolrClientCache();
+    streamContext.setSolrClientCache(solrClientCache);
+
+    try {
+      //Test with spaces in the parameter lists.
+      SolrParams sParamsA = mapParams("q", "*:*", "fl", "id,a_s, a_i,  a_f", "sort", "a_s asc  ,  a_f   asc");
+      CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParamsA);
+      ReducerStream rstream = new ReducerStream(stream,
+          new FieldEqualitor("a_s"),
+          new GroupOperation(new FieldComparator("a_f", ComparatorOrder.ASCENDING), 5));
+
+      rstream.setStreamContext(streamContext);
+      List<Tuple> tuples = getTuples(rstream);
+
+      assertEquals(3, tuples.size());
+
+      Tuple t0 = tuples.get(0);
+      List<Map> maps0 = t0.getMaps("group");
+      assertMaps(maps0, 0, 2, 1, 9);
+
+      Tuple t1 = tuples.get(1);
+      List<Map> maps1 = t1.getMaps("group");
+      assertMaps(maps1, 3, 5, 7, 8);
+
+      Tuple t2 = tuples.get(2);
+      List<Map> maps2 = t2.getMaps("group");
+      assertMaps(maps2, 4, 6);
+
+      //Test with spaces in the parameter lists using a comparator
+      sParamsA = mapParams("q", "*:*", "fl", "id,a_s, a_i,  a_f", "sort", "a_s asc  ,  a_f   asc");
+      stream = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParamsA);
+      rstream = new ReducerStream(stream,
+          new FieldComparator("a_s", ComparatorOrder.ASCENDING),
+          new GroupOperation(new FieldComparator("a_f", ComparatorOrder.DESCENDING), 5));
+      rstream.setStreamContext(streamContext);
+      tuples = getTuples(rstream);
+
+      assertEquals(3, tuples.size());
+
+      t0 = tuples.get(0);
+      maps0 = t0.getMaps("group");
+      assertMaps(maps0, 9, 1, 2, 0);
+
+      t1 = tuples.get(1);
+      maps1 = t1.getMaps("group");
+      assertMaps(maps1, 8, 7, 5, 3);
+
+      t2 = tuples.get(2);
+      maps2 = t2.getMaps("group");
+      assertMaps(maps2, 6, 4);
+    } finally {
+      solrClientCache.close();
+    }
   }
 
   @Test
@@ -392,17 +455,24 @@ public void testTrace() throws Exception {
         .add(id, "9", "a_s", "hello0", "a_i", "14", "a_f", "10")
         .commit(cluster.getSolrClient(), COLLECTIONORALIAS);
 
-    //Test with spaces in the parameter lists.
-    SolrParams sParamsA = mapParams("q", "blah", "fl", "id,a_s, a_i,  a_f", "sort", "a_s asc  ,  a_f   asc");
-    CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParamsA);
-    ReducerStream rstream = new ReducerStream(stream,
-                                              new FieldEqualitor("a_s"),
-                                              new GroupOperation(new FieldComparator("a_f", ComparatorOrder.ASCENDING), 5));
-
-    List<Tuple> tuples = getTuples(rstream);
-
-    assertEquals(0, tuples.size());
-
+    StreamContext streamContext = new StreamContext();
+    SolrClientCache solrClientCache = new SolrClientCache();
+    streamContext.setSolrClientCache(solrClientCache);
+
+    try {
+      //Test with spaces in the parameter lists.
+      SolrParams sParamsA = mapParams("q", "blah", "fl", "id,a_s, a_i,  a_f", "sort", "a_s asc  ,  a_f   asc");
+      CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParamsA);
+      ReducerStream rstream = new ReducerStream(stream,
+          new FieldEqualitor("a_s"),
+          new GroupOperation(new FieldComparator("a_f", ComparatorOrder.ASCENDING), 5));
+      rstream.setStreamContext(streamContext);
+      List<Tuple> tuples = getTuples(rstream);
+
+      assertEquals(0, tuples.size());
+    } finally {
+      solrClientCache.close();
+    }
   }
 
   @Test
@@ -421,56 +491,65 @@ public void testTrace() throws Exception {
         .add(id, "9", "a_s", "hello0", "a_i", "14", "a_f", "10")
         .commit(cluster.getSolrClient(), COLLECTIONORALIAS);
 
-    SolrParams sParamsA = mapParams("q", "*:*", "fl", "id,a_s,a_i,a_f", "sort", "a_s asc,a_f asc", "partitionKeys", "a_s");
-    CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParamsA);
+    StreamContext streamContext = new StreamContext();
+    SolrClientCache solrClientCache = new SolrClientCache();
+    streamContext.setSolrClientCache(solrClientCache);
 
-    ReducerStream rstream = new ReducerStream(stream,
-                                              new FieldEqualitor("a_s"),
-                                              new GroupOperation(new FieldComparator("a_f", ComparatorOrder.DESCENDING), 5));
-    ParallelStream pstream = parallelStream(rstream, new FieldComparator("a_s", ComparatorOrder.ASCENDING));    
-    attachStreamFactory(pstream);
-    List<Tuple> tuples = getTuples(pstream);
+    try {
+      SolrParams sParamsA = mapParams("q", "*:*", "fl", "id,a_s,a_i,a_f", "sort", "a_s asc,a_f asc", "partitionKeys", "a_s");
+      CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParamsA);
 
-    assertEquals(3, tuples.size());
+      ReducerStream rstream = new ReducerStream(stream,
+          new FieldEqualitor("a_s"),
+          new GroupOperation(new FieldComparator("a_f", ComparatorOrder.DESCENDING), 5));
+      ParallelStream pstream = parallelStream(rstream, new FieldComparator("a_s", ComparatorOrder.ASCENDING));
+      attachStreamFactory(pstream);
+      pstream.setStreamContext(streamContext);
+      List<Tuple> tuples = getTuples(pstream);
 
-    Tuple t0 = tuples.get(0);
-    List<Map> maps0 = t0.getMaps("group");
-    assertMaps(maps0, 9, 1, 2, 0);
+      assertEquals(3, tuples.size());
 
-    Tuple t1 = tuples.get(1);
-    List<Map> maps1 = t1.getMaps("group");
-    assertMaps(maps1, 8, 7, 5, 3);
+      Tuple t0 = tuples.get(0);
+      List<Map> maps0 = t0.getMaps("group");
+      assertMaps(maps0, 9, 1, 2, 0);
 
-    Tuple t2 = tuples.get(2);
-    List<Map> maps2 = t2.getMaps("group");
-    assertMaps(maps2, 6, 4);
+      Tuple t1 = tuples.get(1);
+      List<Map> maps1 = t1.getMaps("group");
+      assertMaps(maps1, 8, 7, 5, 3);
 
-    //Test Descending with Ascending subsort
+      Tuple t2 = tuples.get(2);
+      List<Map> maps2 = t2.getMaps("group");
+      assertMaps(maps2, 6, 4);
 
-    sParamsA = mapParams("q", "*:*", "fl", "id,a_s,a_i,a_f", "sort", "a_s desc,a_f asc", "partitionKeys", "a_s");
-    stream = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParamsA);
+      //Test Descending with Ascending subsort
 
-    rstream = new ReducerStream(stream,
-                                new FieldEqualitor("a_s"),
-                                new GroupOperation(new FieldComparator("a_f", ComparatorOrder.ASCENDING), 3));
-    pstream = parallelStream(rstream, new FieldComparator("a_s", ComparatorOrder.DESCENDING));
-    attachStreamFactory(pstream);
-    tuples = getTuples(pstream);
+      sParamsA = mapParams("q", "*:*", "fl", "id,a_s,a_i,a_f", "sort", "a_s desc,a_f asc", "partitionKeys", "a_s");
+      stream = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParamsA);
 
-    assertEquals(3, tuples.size());
+      rstream = new ReducerStream(stream,
+          new FieldEqualitor("a_s"),
+          new GroupOperation(new FieldComparator("a_f", ComparatorOrder.ASCENDING), 3));
+      pstream = parallelStream(rstream, new FieldComparator("a_s", ComparatorOrder.DESCENDING));
+      attachStreamFactory(pstream);
+      pstream.setStreamContext(streamContext);
+      tuples = getTuples(pstream);
 
-    t0 = tuples.get(0);
-    maps0 = t0.getMaps("group");
-    assertMaps(maps0, 4, 6);
+      assertEquals(3, tuples.size());
 
-    t1 = tuples.get(1);
-    maps1 = t1.getMaps("group");
-    assertMaps(maps1, 3, 5, 7);
+      t0 = tuples.get(0);
+      maps0 = t0.getMaps("group");
+      assertMaps(maps0, 4, 6);
 
-    t2 = tuples.get(2);
-    maps2 = t2.getMaps("group");
-    assertMaps(maps2, 0, 2, 1);
+      t1 = tuples.get(1);
+      maps1 = t1.getMaps("group");
+      assertMaps(maps1, 3, 5, 7);
 
+      t2 = tuples.get(2);
+      maps2 = t2.getMaps("group");
+      assertMaps(maps2, 0, 2, 1);
+    } finally {
+      solrClientCache.close();
+    }
   }
 
   @Test
@@ -490,24 +569,33 @@ public void testTrace() throws Exception {
         .add(id, "9", "a_s", "hello0", "a_i", "14", "a_f", "10")
         .commit(cluster.getSolrClient(), COLLECTIONORALIAS);
 
+    StreamContext streamContext = new StreamContext();
+    SolrClientCache solrClientCache = new SolrClientCache();
+    streamContext.setSolrClientCache(solrClientCache);
     //Test an error that comes originates from the /select handler
-    SolrParams sParamsA = mapParams("q", "*:*", "fl", "a_s,a_i,a_f,blah", "sort", "blah asc");
-    CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParamsA);
-    ExceptionStream estream = new ExceptionStream(stream);
-    Tuple t = getTuple(estream);
-    assertTrue(t.EOF);
-    assertTrue(t.EXCEPTION);
-    assertTrue(t.getException().contains("sort param field can't be found: blah"));
-
-    //Test an error that comes originates from the /export handler
-    sParamsA = mapParams("q", "*:*", "fl", "a_s,a_i,a_f,score", "sort", "a_s asc", "qt", "/export");
-    stream = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParamsA);
-    estream = new ExceptionStream(stream);
-    t = getTuple(estream);
-    assertTrue(t.EOF);
-    assertTrue(t.EXCEPTION);
-    //The /export handler will pass through a real exception.
-    assertTrue(t.getException().contains("undefined field:"));
+    try {
+      SolrParams sParamsA = mapParams("q", "*:*", "fl", "a_s,a_i,a_f,blah", "sort", "blah asc");
+      CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParamsA);
+      ExceptionStream estream = new ExceptionStream(stream);
+      estream.setStreamContext(streamContext);
+      Tuple t = getTuple(estream);
+      assertTrue(t.EOF);
+      assertTrue(t.EXCEPTION);
+      assertTrue(t.getException().contains("sort param field can't be found: blah"));
+
+      //Test an error that comes originates from the /export handler
+      sParamsA = mapParams("q", "*:*", "fl", "a_s,a_i,a_f,score", "sort", "a_s asc", "qt", "/export");
+      stream = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParamsA);
+      estream = new ExceptionStream(stream);
+      estream.setStreamContext(streamContext);
+      t = getTuple(estream);
+      assertTrue(t.EOF);
+      assertTrue(t.EXCEPTION);
+      //The /export handler will pass through a real exception.
+      assertTrue(t.getException().contains("undefined field:"));
+    } finally {
+      solrClientCache.close();
+    }
   }
 
   @Test
@@ -577,48 +665,55 @@ public void testTrace() throws Exception {
         .add(id, "9", "a_s", "hello0", "a_i", "14", "a_f", "10")
         .commit(cluster.getSolrClient(), COLLECTIONORALIAS);
 
-    SolrParams sParamsA = mapParams("q", "*:*");
-
-    Metric[] metrics = {new SumMetric("a_i"),
-                        new SumMetric("a_f"),
-                        new MinMetric("a_i"),
-                        new MinMetric("a_f"),
-                        new MaxMetric("a_i"),
-                        new MaxMetric("a_f"),
-                        new MeanMetric("a_i"),
-                        new MeanMetric("a_f"),
-                        new CountMetric()};
-
-    StatsStream statsStream = new StatsStream(zkHost, COLLECTIONORALIAS, sParamsA, metrics);
-
-    List<Tuple> tuples = getTuples(statsStream);
-
-    assertEquals(1, tuples.size());
-
-    //Test Long and Double Sums
-
-    Tuple tuple = tuples.get(0);
-
-    Double sumi = tuple.getDouble("sum(a_i)");
-    Double sumf = tuple.getDouble("sum(a_f)");
-    Double mini = tuple.getDouble("min(a_i)");
-    Double minf = tuple.getDouble("min(a_f)");
-    Double maxi = tuple.getDouble("max(a_i)");
-    Double maxf = tuple.getDouble("max(a_f)");
-    Double avgi = tuple.getDouble("avg(a_i)");
-    Double avgf = tuple.getDouble("avg(a_f)");
-    Double count = tuple.getDouble("count(*)");
-
-    assertEquals(70, sumi.longValue());
-    assertEquals(55.0, sumf.doubleValue(), 0.01);
-    assertEquals(0.0, mini.doubleValue(), 0.01);
-    assertEquals(1.0, minf.doubleValue(), 0.01);
-    assertEquals(14.0, maxi.doubleValue(), 0.01);
-    assertEquals(10.0, maxf.doubleValue(), 0.01);
-    assertEquals(7.0, avgi.doubleValue(), .01);
-    assertEquals(5.5, avgf.doubleValue(), .001);
-    assertEquals(10, count.doubleValue(), .01);
-
+    StreamContext streamContext = new StreamContext();
+    SolrClientCache solrClientCache = new SolrClientCache();
+    streamContext.setSolrClientCache(solrClientCache);
+
+    try {
+      SolrParams sParamsA = mapParams("q", "*:*");
+
+      Metric[] metrics = {new SumMetric("a_i"),
+          new SumMetric("a_f"),
+          new MinMetric("a_i"),
+          new MinMetric("a_f"),
+          new MaxMetric("a_i"),
+          new MaxMetric("a_f"),
+          new MeanMetric("a_i"),
+          new MeanMetric("a_f"),
+          new CountMetric()};
+
+      StatsStream statsStream = new StatsStream(zkHost, COLLECTIONORALIAS, sParamsA, metrics);
+      statsStream.setStreamContext(streamContext);
+      List<Tuple> tuples = getTuples(statsStream);
+
+      assertEquals(1, tuples.size());
+
+      //Test Long and Double Sums
+
+      Tuple tuple = tuples.get(0);
+
+      Double sumi = tuple.getDouble("sum(a_i)");
+      Double sumf = tuple.getDouble("sum(a_f)");
+      Double mini = tuple.getDouble("min(a_i)");
+      Double minf = tuple.getDouble("min(a_f)");
+      Double maxi = tuple.getDouble("max(a_i)");
+      Double maxf = tuple.getDouble("max(a_f)");
+      Double avgi = tuple.getDouble("avg(a_i)");
+      Double avgf = tuple.getDouble("avg(a_f)");
+      Double count = tuple.getDouble("count(*)");
+
+      assertEquals(70, sumi.longValue());
+      assertEquals(55.0, sumf.doubleValue(), 0.01);
+      assertEquals(0.0, mini.doubleValue(), 0.01);
+      assertEquals(1.0, minf.doubleValue(), 0.01);
+      assertEquals(14.0, maxi.doubleValue(), 0.01);
+      assertEquals(10.0, maxf.doubleValue(), 0.01);
+      assertEquals(7.0, avgi.doubleValue(), .01);
+      assertEquals(5.5, avgf.doubleValue(), .001);
+      assertEquals(10, count.doubleValue(), .01);
+    } finally {
+      solrClientCache.close();
+    }
   }
 
   @Test
@@ -637,344 +732,352 @@ public void testTrace() throws Exception {
         .add(id, "9", "a_s", "hello0", "a_i", "14", "a_f", "10")
         .commit(cluster.getSolrClient(), COLLECTIONORALIAS);
 
-    SolrParams sParamsA = mapParams("q", "*:*", "fl", "a_s,a_i,a_f", "sort", "a_s asc");
-
-    Bucket[] buckets =  {new Bucket("a_s")};
-
-    Metric[] metrics = {new SumMetric("a_i"),
-                        new SumMetric("a_f"),
-                        new MinMetric("a_i"),
-                        new MinMetric("a_f"),
-                        new MaxMetric("a_i"),
-                        new MaxMetric("a_f"),
-                        new MeanMetric("a_i"),
-                        new MeanMetric("a_f"),
-                        new CountMetric()};
-
-    FieldComparator[] sorts = {new FieldComparator("sum(a_i)",
-                                                   ComparatorOrder.ASCENDING)};
-
-    FacetStream facetStream = new FacetStream(zkHost, COLLECTIONORALIAS, sParamsA, buckets, metrics, sorts, 100);
-
-    List<Tuple> tuples = getTuples(facetStream);
-
-    assert(tuples.size() == 3);
-
-    //Test Long and Double Sums
-
-    Tuple tuple = tuples.get(0);
-    String bucket = tuple.getString("a_s");
-    Double sumi = tuple.getDouble("sum(a_i)");
-    Double sumf = tuple.getDouble("sum(a_f)");
-    Double mini = tuple.getDouble("min(a_i)");
-    Double minf = tuple.getDouble("min(a_f)");
-    Double maxi = tuple.getDouble("max(a_i)");
-    Double maxf = tuple.getDouble("max(a_f)");
-    Double avgi = tuple.getDouble("avg(a_i)");
-    Double avgf = tuple.getDouble("avg(a_f)");
-    Double count = tuple.getDouble("count(*)");
-
-    assertEquals("hello4", bucket);
-    assertEquals(15, sumi.longValue());
-    assertEquals(11.0, sumf.doubleValue(), 0.01);
-    assertEquals(4.0, mini.doubleValue(), 0.01);
-    assertEquals(4.0, minf.doubleValue(), 0.01);
-    assertEquals(11.0, maxi.doubleValue(), 0.01);
-    assertEquals(7.0, maxf.doubleValue(), 0.01);
-    assertEquals(7.5, avgi.doubleValue(), 0.01);
-    assertEquals(5.5, avgf.doubleValue(), 0.01);
-    assertEquals(2, count.doubleValue(), 0.01);
-
-    tuple = tuples.get(1);
-    bucket = tuple.getString("a_s");
-    sumi = tuple.getDouble("sum(a_i)");
-    sumf = tuple.getDouble("sum(a_f)");
-    mini = tuple.getDouble("min(a_i)");
-    minf = tuple.getDouble("min(a_f)");
-    maxi = tuple.getDouble("max(a_i)");
-    maxf = tuple.getDouble("max(a_f)");
-    avgi = tuple.getDouble("avg(a_i)");
-    avgf = tuple.getDouble("avg(a_f)");
-    count = tuple.getDouble("count(*)");
-
-    assertEquals("hello0", bucket);
-    assertEquals(17, sumi.doubleValue(), .01);
-    assertEquals(18, sumf.doubleValue(), .01);
-    assertEquals(0.0, mini.doubleValue(), .01);
-    assertEquals(1.0, minf.doubleValue(), .01);
-    assertEquals(14.0, maxi.doubleValue(), .01);
-    assertEquals(10.0, maxf.doubleValue(), .01);
-    assertEquals(4.25, avgi.doubleValue(), .01);
-    assertEquals(4.5, avgf.doubleValue(), .01);
-    assertEquals(4, count.doubleValue(), .01);
-
-    tuple = tuples.get(2);
-    bucket = tuple.getString("a_s");
-    sumi = tuple.getDouble("sum(a_i)");
-    sumf = tuple.getDouble("sum(a_f)");
-    mini = tuple.getDouble("min(a_i)");
-    minf = tuple.getDouble("min(a_f)");
-    maxi = tuple.getDouble("max(a_i)");
-    maxf = tuple.getDouble("max(a_f)");
-    avgi = tuple.getDouble("avg(a_i)");
-    avgf = tuple.getDouble("avg(a_f)");
-    count = tuple.getDouble("count(*)");
-
-    assertEquals("hello3", bucket);
-    assertEquals(38.0, sumi.doubleValue(), 0.01);
-    assertEquals(26.0, sumf.doubleValue(), 0.01);
-    assertEquals(3.0, mini.doubleValue(), 0.01);
-    assertEquals(3.0, minf.doubleValue(), 0.01);
-    assertEquals(13.0, maxi.doubleValue(), 0.01);
-    assertEquals(9.0, maxf.doubleValue(), 0.01);
-    assertEquals(9.5, avgi.doubleValue(), 0.01);
-    assertEquals(6.5, avgf.doubleValue(), 0.01);
-    assertEquals(4, count.doubleValue(), 0.01);
-
-
-    //Reverse the Sort.
-
-    sorts[0] = new FieldComparator("sum(a_i)", ComparatorOrder.DESCENDING);
-
-    facetStream = new FacetStream(zkHost, COLLECTIONORALIAS, sParamsA, buckets, metrics, sorts, 100);
-
-    tuples = getTuples(facetStream);
-
-    assertEquals(3, tuples.size());
-
-    //Test Long and Double Sums
-
-    tuple = tuples.get(0);
-    bucket = tuple.getString("a_s");
-    sumi = tuple.getDouble("sum(a_i)");
-    sumf = tuple.getDouble("sum(a_f)");
-    mini = tuple.getDouble("min(a_i)");
-    minf = tuple.getDouble("min(a_f)");
-    maxi = tuple.getDouble("max(a_i)");
-    maxf = tuple.getDouble("max(a_f)");
-    avgi = tuple.getDouble("avg(a_i)");
-    avgf = tuple.getDouble("avg(a_f)");
-    count = tuple.getDouble("count(*)");
-
-    assertEquals("hello3", bucket);
-    assertEquals(38, sumi.doubleValue(), 0.1);
-    assertEquals(26, sumf.doubleValue(), 0.1);
-    assertEquals(3, mini.doubleValue(), 0.1);
-    assertEquals(3, minf.doubleValue(), 0.1);
-    assertEquals(13, maxi.doubleValue(), 0.1);
-    assertEquals(9, maxf.doubleValue(), 0.1);
-    assertEquals(9.5, avgi.doubleValue(), 0.1);
-    assertEquals(6.5, avgf.doubleValue(), 0.1);
-    assertEquals(4, count.doubleValue(), 0.1);
-
-    tuple = tuples.get(1);
-    bucket = tuple.getString("a_s");
-    sumi = tuple.getDouble("sum(a_i)");
-    sumf = tuple.getDouble("sum(a_f)");
-    mini = tuple.getDouble("min(a_i)");
-    minf = tuple.getDouble("min(a_f)");
-    maxi = tuple.getDouble("max(a_i)");
-    maxf = tuple.getDouble("max(a_f)");
-    avgi = tuple.getDouble("avg(a_i)");
-    avgf = tuple.getDouble("avg(a_f)");
-    count = tuple.getDouble("count(*)");
-
-    assertEquals("hello0", bucket);
-    assertEquals(17, sumi.doubleValue(), 0.01);
-    assertEquals(18, sumf.doubleValue(), 0.01);
-    assertEquals(0, mini.doubleValue(), 0.01);
-    assertEquals(1, minf.doubleValue(), 0.01);
-    assertEquals(14, maxi.doubleValue(), 0.01);
-    assertEquals(10, maxf.doubleValue(), 0.01);
-    assertEquals(4.25, avgi.doubleValue(), 0.01);
-    assertEquals(4.5, avgf.doubleValue(), 0.01);
-    assertEquals(4, count.doubleValue(), 0.01);
-
-    tuple = tuples.get(2);
-    bucket = tuple.getString("a_s");
-    sumi = tuple.getDouble("sum(a_i)");
-    sumf = tuple.getDouble("sum(a_f)");
-    mini = tuple.getDouble("min(a_i)");
-    minf = tuple.getDouble("min(a_f)");
-    maxi = tuple.getDouble("max(a_i)");
-    maxf = tuple.getDouble("max(a_f)");
-    avgi = tuple.getDouble("avg(a_i)");
-    avgf = tuple.getDouble("avg(a_f)");
-    count = tuple.getDouble("count(*)");
-
-    assertEquals("hello4", bucket);
-    assertEquals(15, sumi.longValue());
-    assertEquals(11, sumf.doubleValue(), 0.01);
-    assertEquals(4.0, mini.doubleValue(), 0.01);
-    assertEquals(4.0, minf.doubleValue(), 0.01);
-    assertEquals(11.0, maxi.doubleValue(), 0.01);
-    assertEquals(7.0, maxf.doubleValue(), 0.01);
-    assertEquals(7.5, avgi.doubleValue(), 0.01);
-    assertEquals(5.5, avgf.doubleValue(), 0.01);
-    assertEquals(2, count.doubleValue(), 0.01);
-
-
-    //Test index sort
-
-    sorts[0] = new FieldComparator("a_s", ComparatorOrder.DESCENDING);
-
-
-    facetStream = new FacetStream(zkHost, COLLECTIONORALIAS, sParamsA, buckets, metrics, sorts, 100);
-
-    tuples = getTuples(facetStream);
-
-    assertEquals(3, tuples.size());
-
-
-    tuple = tuples.get(0);
-    bucket = tuple.getString("a_s");
-    sumi = tuple.getDouble("sum(a_i)");
-    sumf = tuple.getDouble("sum(a_f)");
-    mini = tuple.getDouble("min(a_i)");
-    minf = tuple.getDouble("min(a_f)");
-    maxi = tuple.getDouble("max(a_i)");
-    maxf = tuple.getDouble("max(a_f)");
-    avgi = tuple.getDouble("avg(a_i)");
-    avgf = tuple.getDouble("avg(a_f)");
-    count = tuple.getDouble("count(*)");
-
-
-    assertEquals("hello4", bucket);
-    assertEquals(15, sumi.longValue());
-    assertEquals(11, sumf.doubleValue(), 0.01);
-    assertEquals(4, mini.doubleValue(), 0.01);
-    assertEquals(4, minf.doubleValue(), 0.01);
-    assertEquals(11, maxi.doubleValue(), 0.01);
-    assertEquals(7, maxf.doubleValue(), 0.01);
-    assertEquals(7.5, avgi.doubleValue(), 0.01);
-    assertEquals(5.5, avgf.doubleValue(), 0.01);
-    assertEquals(2, count.doubleValue(), 0.01);
-
-    tuple = tuples.get(1);
-    bucket = tuple.getString("a_s");
-    sumi = tuple.getDouble("sum(a_i)");
-    sumf = tuple.getDouble("sum(a_f)");
-    mini = tuple.getDouble("min(a_i)");
-    minf = tuple.getDouble("min(a_f)");
-    maxi = tuple.getDouble("max(a_i)");
-    maxf = tuple.getDouble("max(a_f)");
-    avgi = tuple.getDouble("avg(a_i)");
-    avgf = tuple.getDouble("avg(a_f)");
-    count = tuple.getDouble("count(*)");
-
-    assertTrue(bucket.equals("hello3"));
-    assertTrue(sumi.doubleValue() == 38.0D);
-    assertTrue(sumf.doubleValue() == 26.0D);
-    assertTrue(mini.doubleValue() == 3.0D);
-    assertTrue(minf.doubleValue() == 3.0D);
-    assertTrue(maxi.doubleValue() == 13.0D);
-    assertTrue(maxf.doubleValue() == 9.0D);
-    assertTrue(avgi.doubleValue() == 9.5D);
-    assertTrue(avgf.doubleValue() == 6.5D);
-    assertTrue(count.doubleValue() == 4);
-
-    tuple = tuples.get(2);
-    bucket = tuple.getString("a_s");
-    sumi = tuple.getDouble("sum(a_i)");
-    sumf = tuple.getDouble("sum(a_f)");
-    mini = tuple.getDouble("min(a_i)");
-    minf = tuple.getDouble("min(a_f)");
-    maxi = tuple.getDouble("max(a_i)");
-    maxf = tuple.getDouble("max(a_f)");
-    avgi = tuple.getDouble("avg(a_i)");
-    avgf = tuple.getDouble("avg(a_f)");
-    count = tuple.getDouble("count(*)");
-
-    assertEquals("hello0", bucket);
-    assertEquals(17, sumi.doubleValue(), 0.01);
-    assertEquals(18, sumf.doubleValue(), 0.01);
-    assertEquals(0, mini.doubleValue(), 0.01);
-    assertEquals(1, minf.doubleValue(), 0.01);
-    assertEquals(14, maxi.doubleValue(), 0.01);
-    assertEquals(10, maxf.doubleValue(), 0.01);
-    assertEquals(4.25, avgi.doubleValue(), 0.01);
-    assertEquals(4.5, avgf.doubleValue(), 0.01);
-    assertEquals(4, count.doubleValue(), 0.01);
-
-    //Test index sort
-
-    sorts[0] = new FieldComparator("a_s", ComparatorOrder.ASCENDING);
-
-    facetStream = new FacetStream(zkHost, COLLECTIONORALIAS, sParamsA, buckets, metrics, sorts, 100);
-
-    tuples = getTuples(facetStream);
-
-    assertEquals(3, tuples.size());
-
-    tuple = tuples.get(0);
-    bucket = tuple.getString("a_s");
-    sumi = tuple.getDouble("sum(a_i)");
-    sumf = tuple.getDouble("sum(a_f)");
-    mini = tuple.getDouble("min(a_i)");
-    minf = tuple.getDouble("min(a_f)");
-    maxi = tuple.getDouble("max(a_i)");
-    maxf = tuple.getDouble("max(a_f)");
-    avgi = tuple.getDouble("avg(a_i)");
-    avgf = tuple.getDouble("avg(a_f)");
-    count = tuple.getDouble("count(*)");
-
-    assertEquals("hello0", bucket);
-    assertEquals(17, sumi.doubleValue(), 0.01);
-    assertEquals(18, sumf.doubleValue(), 0.01);
-    assertEquals(0, mini.doubleValue(), 0.01);
-    assertEquals(1, minf.doubleValue(), 0.01);
-    assertEquals(14, maxi.doubleValue(), 0.01);
-    assertEquals(10, maxf.doubleValue(), 0.01);
-    assertEquals(4.25, avgi.doubleValue(), 0.0001);
-    assertEquals(4.5, avgf.doubleValue(), 0.001);
-    assertEquals(4, count.doubleValue(), 0.01);
-
-    tuple = tuples.get(1);
-    bucket = tuple.getString("a_s");
-    sumi = tuple.getDouble("sum(a_i)");
-    sumf = tuple.getDouble("sum(a_f)");
-    mini = tuple.getDouble("min(a_i)");
-    minf = tuple.getDouble("min(a_f)");
-    maxi = tuple.getDouble("max(a_i)");
-    maxf = tuple.getDouble("max(a_f)");
-    avgi = tuple.getDouble("avg(a_i)");
-    avgf = tuple.getDouble("avg(a_f)");
-    count = tuple.getDouble("count(*)");
-
-    assertEquals("hello3", bucket);
-    assertEquals(38, sumi.doubleValue(), 0.01);
-    assertEquals(26, sumf.doubleValue(), 0.01);
-    assertEquals(3, mini.doubleValue(), 0.01);
-    assertEquals(3, minf.doubleValue(), 0.01);
-    assertEquals(13, maxi.doubleValue(), 0.01);
-    assertEquals(9, maxf.doubleValue(), 0.01);
-    assertEquals(9.5, avgi.doubleValue(), 0.01);
-    assertEquals(6.5, avgf.doubleValue(), 0.01);
-    assertEquals(4, count.doubleValue(), 0.01);
-
-    tuple = tuples.get(2);
-    bucket = tuple.getString("a_s");
-    sumi = tuple.getDouble("sum(a_i)");
-    sumf = tuple.getDouble("sum(a_f)");
-    mini = tuple.getDouble("min(a_i)");
-    minf = tuple.getDouble("min(a_f)");
-    maxi = tuple.getDouble("max(a_i)");
-    maxf = tuple.getDouble("max(a_f)");
-    avgi = tuple.getDouble("avg(a_i)");
-    avgf = tuple.getDouble("avg(a_f)");
-    count = tuple.getDouble("count(*)");
-
-    assertEquals("hello4", bucket);
-    assertEquals(15, sumi.longValue());
-    assertEquals(11.0, sumf.doubleValue(), 0.1);
-    assertEquals(4.0, mini.doubleValue(), 0.1);
-    assertEquals(4.0, minf.doubleValue(), 0.1);
-    assertEquals(11.0, maxi.doubleValue(), 0.1);
-    assertEquals(7.0, maxf.doubleValue(), 0.1);
-    assertEquals(7.5, avgi.doubleValue(), 0.1);
-    assertEquals(5.5, avgf.doubleValue(), 0.1);
-    assertEquals(2, count.doubleValue(), 0.1);
-
+    StreamContext streamContext = new StreamContext();
+    SolrClientCache solrClientCache = new SolrClientCache();
+    streamContext.setSolrClientCache(solrClientCache);
+
+    try {
+      SolrParams sParamsA = mapParams("q", "*:*", "fl", "a_s,a_i,a_f", "sort", "a_s asc");
+
+      Bucket[] buckets = {new Bucket("a_s")};
+
+      Metric[] metrics = {new SumMetric("a_i"),
+          new SumMetric("a_f"),
+          new MinMetric("a_i"),
+          new MinMetric("a_f"),
+          new MaxMetric("a_i"),
+          new MaxMetric("a_f"),
+          new MeanMetric("a_i"),
+          new MeanMetric("a_f"),
+          new CountMetric()};
+
+      FieldComparator[] sorts = {new FieldComparator("sum(a_i)",
+          ComparatorOrder.ASCENDING)};
+
+      FacetStream facetStream = new FacetStream(zkHost, COLLECTIONORALIAS, sParamsA, buckets, metrics, sorts, 100);
+
+      List<Tuple> tuples = getTuples(facetStream);
+
+      assert (tuples.size() == 3);
+
+      //Test Long and Double Sums
+
+      Tuple tuple = tuples.get(0);
+      String bucket = tuple.getString("a_s");
+      Double sumi = tuple.getDouble("sum(a_i)");
+      Double sumf = tuple.getDouble("sum(a_f)");
+      Double mini = tuple.getDouble("min(a_i)");
+      Double minf = tuple.getDouble("min(a_f)");
+      Double maxi = tuple.getDouble("max(a_i)");
+      Double maxf = tuple.getDouble("max(a_f)");
+      Double avgi = tuple.getDouble("avg(a_i)");
+      Double avgf = tuple.getDouble("avg(a_f)");
+      Double count = tuple.getDouble("count(*)");
+
+      assertEquals("hello4", bucket);
+      assertEquals(15, sumi.longValue());
+      assertEquals(11.0, sumf.doubleValue(), 0.01);
+      assertEquals(4.0, mini.doubleValue(), 0.01);
+      assertEquals(4.0, minf.doubleValue(), 0.01);
+      assertEquals(11.0, maxi.doubleValue(), 0.01);
+      assertEquals(7.0, maxf.doubleValue(), 0.01);
+      assertEquals(7.5, avgi.doubleValue(), 0.01);
+      assertEquals(5.5, avgf.doubleValue(), 0.01);
+      assertEquals(2, count.doubleValue(), 0.01);
+
+      tuple = tuples.get(1);
+      bucket = tuple.getString("a_s");
+      sumi = tuple.getDouble("sum(a_i)");
+      sumf = tuple.getDouble("sum(a_f)");
+      mini = tuple.getDouble("min(a_i)");
+      minf = tuple.getDouble("min(a_f)");
+      maxi = tuple.getDouble("max(a_i)");
+      maxf = tuple.getDouble("max(a_f)");
+      avgi = tuple.getDouble("avg(a_i)");
+      avgf = tuple.getDouble("avg(a_f)");
+      count = tuple.getDouble("count(*)");
+
+      assertEquals("hello0", bucket);
+      assertEquals(17, sumi.doubleValue(), .01);
+      assertEquals(18, sumf.doubleValue(), .01);
+      assertEquals(0.0, mini.doubleValue(), .01);
+      assertEquals(1.0, minf.doubleValue(), .01);
+      assertEquals(14.0, maxi.doubleValue(), .01);
+      assertEquals(10.0, maxf.doubleValue(), .01);
+      assertEquals(4.25, avgi.doubleValue(), .01);
+      assertEquals(4.5, avgf.doubleValue(), .01);
+      assertEquals(4, count.doubleValue(), .01);
+
+      tuple = tuples.get(2);
+      bucket = tuple.getString("a_s");
+      sumi = tuple.getDouble("sum(a_i)");
+      sumf = tuple.getDouble("sum(a_f)");
+      mini = tuple.getDouble("min(a_i)");
+      minf = tuple.getDouble("min(a_f)");
+      maxi = tuple.getDouble("max(a_i)");
+      maxf = tuple.getDouble("max(a_f)");
+      avgi = tuple.getDouble("avg(a_i)");
+      avgf = tuple.getDouble("avg(a_f)");
+      count = tuple.getDouble("count(*)");
+
+      assertEquals("hello3", bucket);
+      assertEquals(38.0, sumi.doubleValue(), 0.01);
+      assertEquals(26.0, sumf.doubleValue(), 0.01);
+      assertEquals(3.0, mini.doubleValue(), 0.01);
+      assertEquals(3.0, minf.doubleValue(), 0.01);
+      assertEquals(13.0, maxi.doubleValue(), 0.01);
+      assertEquals(9.0, maxf.doubleValue(), 0.01);
+      assertEquals(9.5, avgi.doubleValue(), 0.01);
+      assertEquals(6.5, avgf.doubleValue(), 0.01);
+      assertEquals(4, count.doubleValue(), 0.01);
+
+
+      //Reverse the Sort.
+
+      sorts[0] = new FieldComparator("sum(a_i)", ComparatorOrder.DESCENDING);
+
+      facetStream = new FacetStream(zkHost, COLLECTIONORALIAS, sParamsA, buckets, metrics, sorts, 100);
+
+      tuples = getTuples(facetStream);
+
+      assertEquals(3, tuples.size());
+
+      //Test Long and Double Sums
+
+      tuple = tuples.get(0);
+      bucket = tuple.getString("a_s");
+      sumi = tuple.getDouble("sum(a_i)");
+      sumf = tuple.getDouble("sum(a_f)");
+      mini = tuple.getDouble("min(a_i)");
+      minf = tuple.getDouble("min(a_f)");
+      maxi = tuple.getDouble("max(a_i)");
+      maxf = tuple.getDouble("max(a_f)");
+      avgi = tuple.getDouble("avg(a_i)");
+      avgf = tuple.getDouble("avg(a_f)");
+      count = tuple.getDouble("count(*)");
+
+      assertEquals("hello3", bucket);
+      assertEquals(38, sumi.doubleValue(), 0.1);
+      assertEquals(26, sumf.doubleValue(), 0.1);
+      assertEquals(3, mini.doubleValue(), 0.1);
+      assertEquals(3, minf.doubleValue(), 0.1);
+      assertEquals(13, maxi.doubleValue(), 0.1);
+      assertEquals(9, maxf.doubleValue(), 0.1);
+      assertEquals(9.5, avgi.doubleValue(), 0.1);
+      assertEquals(6.5, avgf.doubleValue(), 0.1);
+      assertEquals(4, count.doubleValue(), 0.1);
+
+      tuple = tuples.get(1);
+      bucket = tuple.getString("a_s");
+      sumi = tuple.getDouble("sum(a_i)");
+      sumf = tuple.getDouble("sum(a_f)");
+      mini = tuple.getDouble("min(a_i)");
+      minf = tuple.getDouble("min(a_f)");
+      maxi = tuple.getDouble("max(a_i)");
+      maxf = tuple.getDouble("max(a_f)");
+      avgi = tuple.getDouble("avg(a_i)");
+      avgf = tuple.getDouble("avg(a_f)");
+      count = tuple.getDouble("count(*)");
+
+      assertEquals("hello0", bucket);
+      assertEquals(17, sumi.doubleValue(), 0.01);
+      assertEquals(18, sumf.doubleValue(), 0.01);
+      assertEquals(0, mini.doubleValue(), 0.01);
+      assertEquals(1, minf.doubleValue(), 0.01);
+      assertEquals(14, maxi.doubleValue(), 0.01);
+      assertEquals(10, maxf.doubleValue(), 0.01);
+      assertEquals(4.25, avgi.doubleValue(), 0.01);
+      assertEquals(4.5, avgf.doubleValue(), 0.01);
+      assertEquals(4, count.doubleValue(), 0.01);
+
+      tuple = tuples.get(2);
+      bucket = tuple.getString("a_s");
+      sumi = tuple.getDouble("sum(a_i)");
+      sumf = tuple.getDouble("sum(a_f)");
+      mini = tuple.getDouble("min(a_i)");
+      minf = tuple.getDouble("min(a_f)");
+      maxi = tuple.getDouble("max(a_i)");
+      maxf = tuple.getDouble("max(a_f)");
+      avgi = tuple.getDouble("avg(a_i)");
+      avgf = tuple.getDouble("avg(a_f)");
+      count = tuple.getDouble("count(*)");
+
+      assertEquals("hello4", bucket);
+      assertEquals(15, sumi.longValue());
+      assertEquals(11, sumf.doubleValue(), 0.01);
+      assertEquals(4.0, mini.doubleValue(), 0.01);
+      assertEquals(4.0, minf.doubleValue(), 0.01);
+      assertEquals(11.0, maxi.doubleValue(), 0.01);
+      assertEquals(7.0, maxf.doubleValue(), 0.01);
+      assertEquals(7.5, avgi.doubleValue(), 0.01);
+      assertEquals(5.5, avgf.doubleValue(), 0.01);
+      assertEquals(2, count.doubleValue(), 0.01);
+
+
+      //Test index sort
+
+      sorts[0] = new FieldComparator("a_s", ComparatorOrder.DESCENDING);
+
+
+      facetStream = new FacetStream(zkHost, COLLECTIONORALIAS, sParamsA, buckets, metrics, sorts, 100);
+      facetStream.setStreamContext(streamContext);
+
+      tuples = getTuples(facetStream);
+
+      assertEquals(3, tuples.size());
+
+
+      tuple = tuples.get(0);
+      bucket = tuple.getString("a_s");
+      sumi = tuple.getDouble("sum(a_i)");
+      sumf = tuple.getDouble("sum(a_f)");
+      mini = tuple.getDouble("min(a_i)");
+      minf = tuple.getDouble("min(a_f)");
+      maxi = tuple.getDouble("max(a_i)");
+      maxf = tuple.getDouble("max(a_f)");
+      avgi = tuple.getDouble("avg(a_i)");
+      avgf = tuple.getDouble("avg(a_f)");
+      count = tuple.getDouble("count(*)");
+
+
+      assertEquals("hello4", bucket);
+      assertEquals(15, sumi.longValue());
+      assertEquals(11, sumf.doubleValue(), 0.01);
+      assertEquals(4, mini.doubleValue(), 0.01);
+      assertEquals(4, minf.doubleValue(), 0.01);
+      assertEquals(11, maxi.doubleValue(), 0.01);
+      assertEquals(7, maxf.doubleValue(), 0.01);
+      assertEquals(7.5, avgi.doubleValue(), 0.01);
+      assertEquals(5.5, avgf.doubleValue(), 0.01);
+      assertEquals(2, count.doubleValue(), 0.01);
+
+      tuple = tuples.get(1);
+      bucket = tuple.getString("a_s");
+      sumi = tuple.getDouble("sum(a_i)");
+      sumf = tuple.getDouble("sum(a_f)");
+      mini = tuple.getDouble("min(a_i)");
+      minf = tuple.getDouble("min(a_f)");
+      maxi = tuple.getDouble("max(a_i)");
+      maxf = tuple.getDouble("max(a_f)");
+      avgi = tuple.getDouble("avg(a_i)");
+      avgf = tuple.getDouble("avg(a_f)");
+      count = tuple.getDouble("count(*)");
+
+      assertTrue(bucket.equals("hello3"));
+      assertTrue(sumi.doubleValue() == 38.0D);
+      assertTrue(sumf.doubleValue() == 26.0D);
+      assertTrue(mini.doubleValue() == 3.0D);
+      assertTrue(minf.doubleValue() == 3.0D);
+      assertTrue(maxi.doubleValue() == 13.0D);
+      assertTrue(maxf.doubleValue() == 9.0D);
+      assertTrue(avgi.doubleValue() == 9.5D);
+      assertTrue(avgf.doubleValue() == 6.5D);
+      assertTrue(count.doubleValue() == 4);
+
+      tuple = tuples.get(2);
+      bucket = tuple.getString("a_s");
+      sumi = tuple.getDouble("sum(a_i)");
+      sumf = tuple.getDouble("sum(a_f)");
+      mini = tuple.getDouble("min(a_i)");
+      minf = tuple.getDouble("min(a_f)");
+      maxi = tuple.getDouble("max(a_i)");
+      maxf = tuple.getDouble("max(a_f)");
+      avgi = tuple.getDouble("avg(a_i)");
+      avgf = tuple.getDouble("avg(a_f)");
+      count = tuple.getDouble("count(*)");
+
+      assertEquals("hello0", bucket);
+      assertEquals(17, sumi.doubleValue(), 0.01);
+      assertEquals(18, sumf.doubleValue(), 0.01);
+      assertEquals(0, mini.doubleValue(), 0.01);
+      assertEquals(1, minf.doubleValue(), 0.01);
+      assertEquals(14, maxi.doubleValue(), 0.01);
+      assertEquals(10, maxf.doubleValue(), 0.01);
+      assertEquals(4.25, avgi.doubleValue(), 0.01);
+      assertEquals(4.5, avgf.doubleValue(), 0.01);
+      assertEquals(4, count.doubleValue(), 0.01);
+
+      //Test index sort
+
+      sorts[0] = new FieldComparator("a_s", ComparatorOrder.ASCENDING);
+
+      facetStream = new FacetStream(zkHost, COLLECTIONORALIAS, sParamsA, buckets, metrics, sorts, 100);
+      facetStream.setStreamContext(streamContext);
+      tuples = getTuples(facetStream);
+
+      assertEquals(3, tuples.size());
+
+      tuple = tuples.get(0);
+      bucket = tuple.getString("a_s");
+      sumi = tuple.getDouble("sum(a_i)");
+      sumf = tuple.getDouble("sum(a_f)");
+      mini = tuple.getDouble("min(a_i)");
+      minf = tuple.getDouble("min(a_f)");
+      maxi = tuple.getDouble("max(a_i)");
+      maxf = tuple.getDouble("max(a_f)");
+      avgi = tuple.getDouble("avg(a_i)");
+      avgf = tuple.getDouble("avg(a_f)");
+      count = tuple.getDouble("count(*)");
+
+      assertEquals("hello0", bucket);
+      assertEquals(17, sumi.doubleValue(), 0.01);
+      assertEquals(18, sumf.doubleValue(), 0.01);
+      assertEquals(0, mini.doubleValue(), 0.01);
+      assertEquals(1, minf.doubleValue(), 0.01);
+      assertEquals(14, maxi.doubleValue(), 0.01);
+      assertEquals(10, maxf.doubleValue(), 0.01);
+      assertEquals(4.25, avgi.doubleValue(), 0.0001);
+      assertEquals(4.5, avgf.doubleValue(), 0.001);
+      assertEquals(4, count.doubleValue(), 0.01);
+
+      tuple = tuples.get(1);
+      bucket = tuple.getString("a_s");
+      sumi = tuple.getDouble("sum(a_i)");
+      sumf = tuple.getDouble("sum(a_f)");
+      mini = tuple.getDouble("min(a_i)");
+      minf = tuple.getDouble("min(a_f)");
+      maxi = tuple.getDouble("max(a_i)");
+      maxf = tuple.getDouble("max(a_f)");
+      avgi = tuple.getDouble("avg(a_i)");
+      avgf = tuple.getDouble("avg(a_f)");
+      count = tuple.getDouble("count(*)");
+
+      assertEquals("hello3", bucket);
+      assertEquals(38, sumi.doubleValue(), 0.01);
+      assertEquals(26, sumf.doubleValue(), 0.01);
+      assertEquals(3, mini.doubleValue(), 0.01);
+      assertEquals(3, minf.doubleValue(), 0.01);
+      assertEquals(13, maxi.doubleValue(), 0.01);
+      assertEquals(9, maxf.doubleValue(), 0.01);
+      assertEquals(9.5, avgi.doubleValue(), 0.01);
+      assertEquals(6.5, avgf.doubleValue(), 0.01);
+      assertEquals(4, count.doubleValue(), 0.01);
+
+      tuple = tuples.get(2);
+      bucket = tuple.getString("a_s");
+      sumi = tuple.getDouble("sum(a_i)");
+      sumf = tuple.getDouble("sum(a_f)");
+      mini = tuple.getDouble("min(a_i)");
+      minf = tuple.getDouble("min(a_f)");
+      maxi = tuple.getDouble("max(a_i)");
+      maxf = tuple.getDouble("max(a_f)");
+      avgi = tuple.getDouble("avg(a_i)");
+      avgf = tuple.getDouble("avg(a_f)");
+      count = tuple.getDouble("count(*)");
+
+      assertEquals("hello4", bucket);
+      assertEquals(15, sumi.longValue());
+      assertEquals(11.0, sumf.doubleValue(), 0.1);
+      assertEquals(4.0, mini.doubleValue(), 0.1);
+      assertEquals(4.0, minf.doubleValue(), 0.1);
+      assertEquals(11.0, maxi.doubleValue(), 0.1);
+      assertEquals(7.0, maxf.doubleValue(), 0.1);
+      assertEquals(7.5, avgi.doubleValue(), 0.1);
+      assertEquals(5.5, avgf.doubleValue(), 0.1);
+      assertEquals(2, count.doubleValue(), 0.1);
+    } finally {
+      solrClientCache.close();
+    }
   }
 
 
@@ -1042,7 +1145,11 @@ public void testTrace() throws Exception {
     List<String> selectOrder = ("asc".equals(sortDir)) ? Arrays.asList(ascOrder) : Arrays.asList(descOrder);
     List<String> selectOrderBool = ("asc".equals(sortDir)) ? Arrays.asList(ascOrderBool) : Arrays.asList(descOrderBool);
     SolrParams exportParams = mapParams("q", "*:*", "qt", "/export", "fl", "id," + field, "sort", field + " " + sortDir + ",id asc");
+    StreamContext streamContext = new StreamContext();
+    SolrClientCache solrClientCache = new SolrClientCache();
+    streamContext.setSolrClientCache(solrClientCache);
     try (CloudSolrStream solrStream = new CloudSolrStream(zkHost, COLLECTIONORALIAS, exportParams)) {
+      solrStream.setStreamContext(streamContext);
       List<Tuple> tuples = getTuples(solrStream);
       assertEquals("There should be exactly 32 responses returned", 32, tuples.size());
       // Since the getTuples method doesn't return the EOF tuple, these two entries should be the same size.
@@ -1053,6 +1160,8 @@ public void testTrace() throws Exception {
                 "' RESTORE GETTING selectOrder from select statement after LUCENE-7548",
             tuples.get(idx).getString("id"), (field.startsWith("b_") ? selectOrderBool.get(idx) : selectOrder.get(idx)));
       }
+    } finally {
+      solrClientCache.close();
     }
   }
 
@@ -1081,7 +1190,12 @@ public void testTrace() throws Exception {
     }
     SolrParams sParams = mapParams("q", "*:*", "qt", "/export", "fl", fl.toString(), "sort", "id asc");
 
+    StreamContext streamContext = new StreamContext();
+    SolrClientCache solrClientCache = new SolrClientCache();
+    streamContext.setSolrClientCache(solrClientCache);
+
     try (CloudSolrStream solrStream = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParams)) {
+      solrStream.setStreamContext(streamContext);
       List<Tuple> tuples = getTuples(solrStream);
       assertEquals("There should be exactly 32 responses returned", 32, tuples.size());
 
@@ -1097,6 +1211,8 @@ public void testTrace() throws Exception {
           }
         }
       }
+    } finally {
+      solrClientCache.close();
     }
   }
 
@@ -1229,173 +1345,181 @@ public void testTrace() throws Exception {
         .add(id, "9", "level1_s", "hello0", "level2_s", "b", "a_i", "14", "a_f", "10")
         .commit(cluster.getSolrClient(), COLLECTIONORALIAS);
 
-    SolrParams sParamsA = mapParams("q", "*:*", "fl", "a_i,a_f");
-
-    Bucket[] buckets =  {new Bucket("level1_s"), new Bucket("level2_s")};
-
-    Metric[] metrics = {new SumMetric("a_i"),
-                        new CountMetric()};
-
-    FieldComparator[] sorts = {new FieldComparator("sum(a_i)", ComparatorOrder.DESCENDING), new FieldComparator("sum(a_i)", ComparatorOrder.DESCENDING)};
-
-    FacetStream facetStream = new FacetStream(
-        zkHost,
-        COLLECTIONORALIAS,
-        sParamsA,
-        buckets,
-        metrics,
-        sorts,
-        100);
-
-    List<Tuple> tuples = getTuples(facetStream);
-    assertEquals(6, tuples.size());
-
-    Tuple tuple = tuples.get(0);
-    String bucket1 = tuple.getString("level1_s");
-    String bucket2 = tuple.getString("level2_s");
-    Double sumi = tuple.getDouble("sum(a_i)");
-    Double count = tuple.getDouble("count(*)");
-
-    assertEquals("hello3", bucket1);
-    assertEquals("b", bucket2);
-    assertEquals(35, sumi.longValue());
-    assertEquals(3, count, 0.1);
-
-    tuple = tuples.get(1);
-    bucket1 = tuple.getString("level1_s");
-    bucket2 = tuple.getString("level2_s");
-    sumi = tuple.getDouble("sum(a_i)");
-    count = tuple.getDouble("count(*)");
-
-    assertEquals("hello0", bucket1);
-    assertEquals("b", bucket2);
-    assertEquals(15, sumi.longValue());
-    assertEquals(2, count, 0.1);
-
-    tuple = tuples.get(2);
-    bucket1 = tuple.getString("level1_s");
-    bucket2 = tuple.getString("level2_s");
-    sumi = tuple.getDouble("sum(a_i)");
-    count = tuple.getDouble("count(*)");
-
-    assertEquals("hello4", bucket1);
-    assertEquals("b", bucket2);
-    assertEquals(11, sumi.longValue());
-    assertEquals(1, count.doubleValue(), 0.1);
-
-    tuple = tuples.get(3);
-    bucket1 = tuple.getString("level1_s");
-    bucket2 = tuple.getString("level2_s");
-    sumi = tuple.getDouble("sum(a_i)");
-    count = tuple.getDouble("count(*)");
-
-    assertEquals("hello4", bucket1);
-    assertEquals("a", bucket2);
-    assertEquals(4, sumi.longValue());
-    assertEquals(1, count.doubleValue(), 0.1);
-
-    tuple = tuples.get(4);
-    bucket1 = tuple.getString("level1_s");
-    bucket2 = tuple.getString("level2_s");
-    sumi = tuple.getDouble("sum(a_i)");
-    count = tuple.getDouble("count(*)");
-
-    assertEquals("hello3", bucket1);
-    assertEquals("a", bucket2);
-    assertEquals(3, sumi.longValue());
-    assertEquals(1, count.doubleValue(), 0.1);
-
-    tuple = tuples.get(5);
-    bucket1 = tuple.getString("level1_s");
-    bucket2 = tuple.getString("level2_s");
-    sumi = tuple.getDouble("sum(a_i)");
-    count = tuple.getDouble("count(*)");
-
-    assertEquals("hello0", bucket1);
-    assertEquals("a", bucket2);
-    assertEquals(2, sumi.longValue());
-    assertEquals(2, count.doubleValue(), 0.1);
-
-    sorts[0] =  new FieldComparator("level1_s", ComparatorOrder.DESCENDING );
-    sorts[1] =  new FieldComparator("level2_s", ComparatorOrder.DESCENDING );
-    facetStream = new FacetStream(
-        zkHost,
-        COLLECTIONORALIAS,
-        sParamsA,
-        buckets,
-        metrics,
-        sorts,
-        100);
-
-    tuples = getTuples(facetStream);
-    assertEquals(6, tuples.size());
-
-    tuple = tuples.get(0);
-    bucket1 = tuple.getString("level1_s");
-    bucket2 = tuple.getString("level2_s");
-    sumi = tuple.getDouble("sum(a_i)");
-    count = tuple.getDouble("count(*)");
-
-    assertEquals("hello4", bucket1);
-    assertEquals("b", bucket2);
-    assertEquals(11, sumi.longValue());
-    assertEquals(1, count, 0.1);
-
-    tuple = tuples.get(1);
-    bucket1 = tuple.getString("level1_s");
-    bucket2 = tuple.getString("level2_s");
-    sumi = tuple.getDouble("sum(a_i)");
-    count = tuple.getDouble("count(*)");
-
-    assertEquals("hello4", bucket1);
-    assertEquals("a", bucket2);
-    assertEquals(4, sumi.longValue());
-    assertEquals(1, count.doubleValue(), 0.1);
-
-    tuple = tuples.get(2);
-    bucket1 = tuple.getString("level1_s");
-    bucket2 = tuple.getString("level2_s");
-    sumi = tuple.getDouble("sum(a_i)");
-    count = tuple.getDouble("count(*)");
-
-    assertEquals("hello3", bucket1);
-    assertEquals("b", bucket2);
-    assertEquals(35, sumi.longValue());
-    assertEquals(3, count.doubleValue(), 0.1);
-
-    tuple = tuples.get(3);
-    bucket1 = tuple.getString("level1_s");
-    bucket2 = tuple.getString("level2_s");
-    sumi = tuple.getDouble("sum(a_i)");
-    count = tuple.getDouble("count(*)");
-
-    assertEquals("hello3", bucket1);
-    assertEquals("a", bucket2);
-    assertEquals(3, sumi.longValue());
-    assertEquals(1, count.doubleValue(), 0.1);
-
-    tuple = tuples.get(4);
-    bucket1 = tuple.getString("level1_s");
-    bucket2 = tuple.getString("level2_s");
-    sumi = tuple.getDouble("sum(a_i)");
-    count = tuple.getDouble("count(*)");
-
-    assertEquals("hello0", bucket1);
-    assertEquals("b", bucket2);
-    assertEquals(15, sumi.longValue());
-    assertEquals(2, count.doubleValue(), 0.1);
-
-    tuple = tuples.get(5);
-    bucket1 = tuple.getString("level1_s");
-    bucket2 = tuple.getString("level2_s");
-    sumi = tuple.getDouble("sum(a_i)");
-    count = tuple.getDouble("count(*)");
-
-    assertEquals("hello0", bucket1);
-    assertEquals("a", bucket2);
-    assertEquals(2, sumi.longValue());
-    assertEquals(2, count.doubleValue(), 0.1);
-
+    StreamContext streamContext = new StreamContext();
+    SolrClientCache solrClientCache = new SolrClientCache();
+    streamContext.setSolrClientCache(solrClientCache);
+
+    try {
+
+      SolrParams sParamsA = mapParams("q", "*:*", "fl", "a_i,a_f");
+
+      Bucket[] buckets = {new Bucket("level1_s"), new Bucket("level2_s")};
+
+      Metric[] metrics = {new SumMetric("a_i"),
+          new CountMetric()};
+
+      FieldComparator[] sorts = {new FieldComparator("sum(a_i)", ComparatorOrder.DESCENDING), new FieldComparator("sum(a_i)", ComparatorOrder.DESCENDING)};
+
+      FacetStream facetStream = new FacetStream(
+          zkHost,
+          COLLECTIONORALIAS,
+          sParamsA,
+          buckets,
+          metrics,
+          sorts,
+          100);
+      facetStream.setStreamContext(streamContext);
+      List<Tuple> tuples = getTuples(facetStream);
+      assertEquals(6, tuples.size());
+
+      Tuple tuple = tuples.get(0);
+      String bucket1 = tuple.getString("level1_s");
+      String bucket2 = tuple.getString("level2_s");
+      Double sumi = tuple.getDouble("sum(a_i)");
+      Double count = tuple.getDouble("count(*)");
+
+      assertEquals("hello3", bucket1);
+      assertEquals("b", bucket2);
+      assertEquals(35, sumi.longValue());
+      assertEquals(3, count, 0.1);
+
+      tuple = tuples.get(1);
+      bucket1 = tuple.getString("level1_s");
+      bucket2 = tuple.getString("level2_s");
+      sumi = tuple.getDouble("sum(a_i)");
+      count = tuple.getDouble("count(*)");
+
+      assertEquals("hello0", bucket1);
+      assertEquals("b", bucket2);
+      assertEquals(15, sumi.longValue());
+      assertEquals(2, count, 0.1);
+
+      tuple = tuples.get(2);
+      bucket1 = tuple.getString("level1_s");
+      bucket2 = tuple.getString("level2_s");
+      sumi = tuple.getDouble("sum(a_i)");
+      count = tuple.getDouble("count(*)");
+
+      assertEquals("hello4", bucket1);
+      assertEquals("b", bucket2);
+      assertEquals(11, sumi.longValue());
+      assertEquals(1, count.doubleValue(), 0.1);
+
+      tuple = tuples.get(3);
+      bucket1 = tuple.getString("level1_s");
+      bucket2 = tuple.getString("level2_s");
+      sumi = tuple.getDouble("sum(a_i)");
+      count = tuple.getDouble("count(*)");
+
+      assertEquals("hello4", bucket1);
+      assertEquals("a", bucket2);
+      assertEquals(4, sumi.longValue());
+      assertEquals(1, count.doubleValue(), 0.1);
+
+      tuple = tuples.get(4);
+      bucket1 = tuple.getString("level1_s");
+      bucket2 = tuple.getString("level2_s");
+      sumi = tuple.getDouble("sum(a_i)");
+      count = tuple.getDouble("count(*)");
+
+      assertEquals("hello3", bucket1);
+      assertEquals("a", bucket2);
+      assertEquals(3, sumi.longValue());
+      assertEquals(1, count.doubleValue(), 0.1);
+
+      tuple = tuples.get(5);
+      bucket1 = tuple.getString("level1_s");
+      bucket2 = tuple.getString("level2_s");
+      sumi = tuple.getDouble("sum(a_i)");
+      count = tuple.getDouble("count(*)");
+
+      assertEquals("hello0", bucket1);
+      assertEquals("a", bucket2);
+      assertEquals(2, sumi.longValue());
+      assertEquals(2, count.doubleValue(), 0.1);
+
+      sorts[0] = new FieldComparator("level1_s", ComparatorOrder.DESCENDING);
+      sorts[1] = new FieldComparator("level2_s", ComparatorOrder.DESCENDING);
+      facetStream = new FacetStream(
+          zkHost,
+          COLLECTIONORALIAS,
+          sParamsA,
+          buckets,
+          metrics,
+          sorts,
+          100);
+      facetStream.setStreamContext(streamContext);
+      tuples = getTuples(facetStream);
+      assertEquals(6, tuples.size());
+
+      tuple = tuples.get(0);
+      bucket1 = tuple.getString("level1_s");
+      bucket2 = tuple.getString("level2_s");
+      sumi = tuple.getDouble("sum(a_i)");
+      count = tuple.getDouble("count(*)");
+
+      assertEquals("hello4", bucket1);
+      assertEquals("b", bucket2);
+      assertEquals(11, sumi.longValue());
+      assertEquals(1, count, 0.1);
+
+      tuple = tuples.get(1);
+      bucket1 = tuple.getString("level1_s");
+      bucket2 = tuple.getString("level2_s");
+      sumi = tuple.getDouble("sum(a_i)");
+      count = tuple.getDouble("count(*)");
+
+      assertEquals("hello4", bucket1);
+      assertEquals("a", bucket2);
+      assertEquals(4, sumi.longValue());
+      assertEquals(1, count.doubleValue(), 0.1);
+
+      tuple = tuples.get(2);
+      bucket1 = tuple.getString("level1_s");
+      bucket2 = tuple.getString("level2_s");
+      sumi = tuple.getDouble("sum(a_i)");
+      count = tuple.getDouble("count(*)");
+
+      assertEquals("hello3", bucket1);
+      assertEquals("b", bucket2);
+      assertEquals(35, sumi.longValue());
+      assertEquals(3, count.doubleValue(), 0.1);
+
+      tuple = tuples.get(3);
+      bucket1 = tuple.getString("level1_s");
+      bucket2 = tuple.getString("level2_s");
+      sumi = tuple.getDouble("sum(a_i)");
+      count = tuple.getDouble("count(*)");
+
+      assertEquals("hello3", bucket1);
+      assertEquals("a", bucket2);
+      assertEquals(3, sumi.longValue());
+      assertEquals(1, count.doubleValue(), 0.1);
+
+      tuple = tuples.get(4);
+      bucket1 = tuple.getString("level1_s");
+      bucket2 = tuple.getString("level2_s");
+      sumi = tuple.getDouble("sum(a_i)");
+      count = tuple.getDouble("count(*)");
+
+      assertEquals("hello0", bucket1);
+      assertEquals("b", bucket2);
+      assertEquals(15, sumi.longValue());
+      assertEquals(2, count.doubleValue(), 0.1);
+
+      tuple = tuples.get(5);
+      bucket1 = tuple.getString("level1_s");
+      bucket2 = tuple.getString("level2_s");
+      sumi = tuple.getDouble("sum(a_i)");
+      count = tuple.getDouble("count(*)");
+
+      assertEquals("hello0", bucket1);
+      assertEquals("a", bucket2);
+      assertEquals(2, sumi.longValue());
+      assertEquals(2, count.doubleValue(), 0.1);
+    } finally {
+      solrClientCache.close();
+    }
   }
 
   @Test
@@ -1413,166 +1537,174 @@ public void testTrace() throws Exception {
         .add(id, "8", "a_s", "hello3", "a_i", "13", "a_f", "9")
         .add(id, "9", "a_s", "hello0", "a_i", "14", "a_f", "10")
         .commit(cluster.getSolrClient(), COLLECTIONORALIAS);
-
-    SolrParams sParamsA = mapParams("q", "*:*", "fl", "a_s,a_i,a_f", "sort", "a_s asc");
-    CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParamsA);
-
-    Bucket[] buckets =  {new Bucket("a_s")};
-
-    Metric[] metrics = {new SumMetric("a_i"),
-                        new SumMetric("a_f"),
-                        new MinMetric("a_i"),
-                        new MinMetric("a_f"),
-                        new MaxMetric("a_i"),
-                        new MaxMetric("a_f"),
-                        new MeanMetric("a_i"),
-                        new MeanMetric("a_f"),
-                        new CountMetric()};
-
-    RollupStream rollupStream = new RollupStream(stream, buckets, metrics);
-    List<Tuple> tuples = getTuples(rollupStream);
-
-    assert(tuples.size() == 3);
-
-    //Test Long and Double Sums
-
-    Tuple tuple = tuples.get(0);
-    String bucket = tuple.getString("a_s");
-    Double sumi = tuple.getDouble("sum(a_i)");
-    Double sumf = tuple.getDouble("sum(a_f)");
-    Double mini = tuple.getDouble("min(a_i)");
-    Double minf = tuple.getDouble("min(a_f)");
-    Double maxi = tuple.getDouble("max(a_i)");
-    Double maxf = tuple.getDouble("max(a_f)");
-    Double avgi = tuple.getDouble("avg(a_i)");
-    Double avgf = tuple.getDouble("avg(a_f)");
-    Double count = tuple.getDouble("count(*)");
-
-
-    assertEquals("hello0", bucket);
-    assertEquals(17, sumi.doubleValue(), 0.001);
-    assertEquals(18, sumf.doubleValue(), 0.001);
-    assertEquals(0, mini.doubleValue(), 0.001);
-    assertEquals(1, minf.doubleValue(), 0.001);
-    assertEquals(14, maxi.doubleValue(), 0.001);
-    assertEquals(10, maxf.doubleValue(), 0.001);
-    assertEquals(4.25, avgi.doubleValue(), 0.001);
-    assertEquals(4.5, avgf.doubleValue(), 0.001);
-    assertEquals(4, count.doubleValue(), 0.001);
-
-
-    tuple = tuples.get(1);
-    bucket = tuple.getString("a_s");
-    sumi = tuple.getDouble("sum(a_i)");
-    sumf = tuple.getDouble("sum(a_f)");
-    mini = tuple.getDouble("min(a_i)");
-    minf = tuple.getDouble("min(a_f)");
-    maxi = tuple.getDouble("max(a_i)");
-    maxf = tuple.getDouble("max(a_f)");
-    avgi = tuple.getDouble("avg(a_i)");
-    avgf = tuple.getDouble("avg(a_f)");
-    count = tuple.getDouble("count(*)");
-
-    assertEquals("hello3", bucket);
-    assertEquals(38, sumi.doubleValue(), 0.001);
-    assertEquals(26, sumf.doubleValue(), 0.001);
-    assertEquals(3, mini.doubleValue(), 0.001);
-    assertEquals(3, minf.doubleValue(), 0.001);
-    assertEquals(13, maxi.doubleValue(), 0.001);
-    assertEquals(9, maxf.doubleValue(), 0.001);
-    assertEquals(9.5, avgi.doubleValue(), 0.001);
-    assertEquals(6.5, avgf.doubleValue(), 0.001);
-    assertEquals(4, count.doubleValue(), 0.001);
-
-
-    tuple = tuples.get(2);
-    bucket = tuple.getString("a_s");
-    sumi = tuple.getDouble("sum(a_i)");
-    sumf = tuple.getDouble("sum(a_f)");
-    mini = tuple.getDouble("min(a_i)");
-    minf = tuple.getDouble("min(a_f)");
-    maxi = tuple.getDouble("max(a_i)");
-    maxf = tuple.getDouble("max(a_f)");
-    avgi = tuple.getDouble("avg(a_i)");
-    avgf = tuple.getDouble("avg(a_f)");
-    count = tuple.getDouble("count(*)");
-
-    assertEquals("hello4", bucket);
-    assertEquals(15, sumi.longValue());
-    assertEquals(11, sumf.doubleValue(), 0.01);
-    assertEquals(4, mini.doubleValue(), 0.01);
-    assertEquals(4, minf.doubleValue(), 0.01);
-    assertEquals(11, maxi.doubleValue(), 0.01);
-    assertEquals(7, maxf.doubleValue(), 0.01);
-    assertEquals(7.5, avgi.doubleValue(), 0.01);
-    assertEquals(5.5, avgf.doubleValue(), 0.01);
-    assertEquals(2, count.doubleValue(), 0.01);
-
-    // Test will null metrics
-    rollupStream = new RollupStream(stream, buckets, metrics);
-    tuples = getTuples(rollupStream);
-
-    assert(tuples.size() == 3);
-    tuple = tuples.get(0);
-    bucket = tuple.getString("a_s");
-    assertTrue(bucket.equals("hello0"));
-
-    tuple = tuples.get(1);
-    bucket = tuple.getString("a_s");
-    assertTrue(bucket.equals("hello3"));
-
-    tuple = tuples.get(2);
-    bucket = tuple.getString("a_s");
-    assertTrue(bucket.equals("hello4"));
-
-
-    //Test will null value in the grouping field
-    new UpdateRequest()
-        .add(id, "12", "a_s", null, "a_i", "14", "a_f", "10")
-        .commit(cluster.getSolrClient(), COLLECTIONORALIAS);
-
-    sParamsA = mapParams("q", "*:*", "fl", "a_s,a_i,a_f", "sort", "a_s asc", "qt", "/export");
-    stream = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParamsA);
-
-    Bucket[] buckets1 =  {new Bucket("a_s")};
-
-    Metric[] metrics1 = {new SumMetric("a_i"),
-        new SumMetric("a_f"),
-        new MinMetric("a_i"),
-        new MinMetric("a_f"),
-        new MaxMetric("a_i"),
-        new MaxMetric("a_f"),
-        new MeanMetric("a_i"),
-        new MeanMetric("a_f"),
-        new CountMetric()};
-
-    rollupStream = new RollupStream(stream, buckets1, metrics1);
-    tuples = getTuples(rollupStream);
-    //Check that we've got the extra NULL bucket
-    assertEquals(4, tuples.size());
-    tuple = tuples.get(0);
-    assertEquals("NULL", tuple.getString("a_s"));
-
-    sumi = tuple.getDouble("sum(a_i)");
-    sumf = tuple.getDouble("sum(a_f)");
-    mini = tuple.getDouble("min(a_i)");
-    minf = tuple.getDouble("min(a_f)");
-    maxi = tuple.getDouble("max(a_i)");
-    maxf = tuple.getDouble("max(a_f)");
-    avgi = tuple.getDouble("avg(a_i)");
-    avgf = tuple.getDouble("avg(a_f)");
-    count = tuple.getDouble("count(*)");
-
-    assertEquals(14, sumi.doubleValue(), 0.01);
-    assertEquals(10, sumf.doubleValue(), 0.01);
-    assertEquals(14, mini.doubleValue(), 0.01);
-    assertEquals(10, minf.doubleValue(), 0.01);
-    assertEquals(14, maxi.doubleValue(), 0.01);
-    assertEquals(10, maxf.doubleValue(), 0.01);
-    assertEquals(14, avgi.doubleValue(), 0.01);
-    assertEquals(10, avgf.doubleValue(), 0.01);
-    assertEquals(1, count.doubleValue(), 0.01);
-
+    StreamContext streamContext = new StreamContext();
+    SolrClientCache solrClientCache = new SolrClientCache();
+    streamContext.setSolrClientCache(solrClientCache);
+
+    try {
+      SolrParams sParamsA = mapParams("q", "*:*", "fl", "a_s,a_i,a_f", "sort", "a_s asc");
+      CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParamsA);
+
+      Bucket[] buckets = {new Bucket("a_s")};
+
+      Metric[] metrics = {new SumMetric("a_i"),
+          new SumMetric("a_f"),
+          new MinMetric("a_i"),
+          new MinMetric("a_f"),
+          new MaxMetric("a_i"),
+          new MaxMetric("a_f"),
+          new MeanMetric("a_i"),
+          new MeanMetric("a_f"),
+          new CountMetric()};
+
+      RollupStream rollupStream = new RollupStream(stream, buckets, metrics);
+      rollupStream.setStreamContext(streamContext);
+      List<Tuple> tuples = getTuples(rollupStream);
+
+      assert (tuples.size() == 3);
+
+      //Test Long and Double Sums
+
+      Tuple tuple = tuples.get(0);
+      String bucket = tuple.getString("a_s");
+      Double sumi = tuple.getDouble("sum(a_i)");
+      Double sumf = tuple.getDouble("sum(a_f)");
+      Double mini = tuple.getDouble("min(a_i)");
+      Double minf = tuple.getDouble("min(a_f)");
+      Double maxi = tuple.getDouble("max(a_i)");
+      Double maxf = tuple.getDouble("max(a_f)");
+      Double avgi = tuple.getDouble("avg(a_i)");
+      Double avgf = tuple.getDouble("avg(a_f)");
+      Double count = tuple.getDouble("count(*)");
+
+
+      assertEquals("hello0", bucket);
+      assertEquals(17, sumi.doubleValue(), 0.001);
+      assertEquals(18, sumf.doubleValue(), 0.001);
+      assertEquals(0, mini.doubleValue(), 0.001);
+      assertEquals(1, minf.doubleValue(), 0.001);
+      assertEquals(14, maxi.doubleValue(), 0.001);
+      assertEquals(10, maxf.doubleValue(), 0.001);
+      assertEquals(4.25, avgi.doubleValue(), 0.001);
+      assertEquals(4.5, avgf.doubleValue(), 0.001);
+      assertEquals(4, count.doubleValue(), 0.001);
+
+
+      tuple = tuples.get(1);
+      bucket = tuple.getString("a_s");
+      sumi = tuple.getDouble("sum(a_i)");
+      sumf = tuple.getDouble("sum(a_f)");
+      mini = tuple.getDouble("min(a_i)");
+      minf = tuple.getDouble("min(a_f)");
+      maxi = tuple.getDouble("max(a_i)");
+      maxf = tuple.getDouble("max(a_f)");
+      avgi = tuple.getDouble("avg(a_i)");
+      avgf = tuple.getDouble("avg(a_f)");
+      count = tuple.getDouble("count(*)");
+
+      assertEquals("hello3", bucket);
+      assertEquals(38, sumi.doubleValue(), 0.001);
+      assertEquals(26, sumf.doubleValue(), 0.001);
+      assertEquals(3, mini.doubleValue(), 0.001);
+      assertEquals(3, minf.doubleValue(), 0.001);
+      assertEquals(13, maxi.doubleValue(), 0.001);
+      assertEquals(9, maxf.doubleValue(), 0.001);
+      assertEquals(9.5, avgi.doubleValue(), 0.001);
+      assertEquals(6.5, avgf.doubleValue(), 0.001);
+      assertEquals(4, count.doubleValue(), 0.001);
+
+
+      tuple = tuples.get(2);
+      bucket = tuple.getString("a_s");
+      sumi = tuple.getDouble("sum(a_i)");
+      sumf = tuple.getDouble("sum(a_f)");
+      mini = tuple.getDouble("min(a_i)");
+      minf = tuple.getDouble("min(a_f)");
+      maxi = tuple.getDouble("max(a_i)");
+      maxf = tuple.getDouble("max(a_f)");
+      avgi = tuple.getDouble("avg(a_i)");
+      avgf = tuple.getDouble("avg(a_f)");
+      count = tuple.getDouble("count(*)");
+
+      assertEquals("hello4", bucket);
+      assertEquals(15, sumi.longValue());
+      assertEquals(11, sumf.doubleValue(), 0.01);
+      assertEquals(4, mini.doubleValue(), 0.01);
+      assertEquals(4, minf.doubleValue(), 0.01);
+      assertEquals(11, maxi.doubleValue(), 0.01);
+      assertEquals(7, maxf.doubleValue(), 0.01);
+      assertEquals(7.5, avgi.doubleValue(), 0.01);
+      assertEquals(5.5, avgf.doubleValue(), 0.01);
+      assertEquals(2, count.doubleValue(), 0.01);
+
+      // Test will null metrics
+      rollupStream = new RollupStream(stream, buckets, metrics);
+      rollupStream.setStreamContext(streamContext);
+      tuples = getTuples(rollupStream);
+
+      assert (tuples.size() == 3);
+      tuple = tuples.get(0);
+      bucket = tuple.getString("a_s");
+      assertTrue(bucket.equals("hello0"));
+
+      tuple = tuples.get(1);
+      bucket = tuple.getString("a_s");
+      assertTrue(bucket.equals("hello3"));
+
+      tuple = tuples.get(2);
+      bucket = tuple.getString("a_s");
+      assertTrue(bucket.equals("hello4"));
+
+
+      //Test will null value in the grouping field
+      new UpdateRequest()
+          .add(id, "12", "a_s", null, "a_i", "14", "a_f", "10")
+          .commit(cluster.getSolrClient(), COLLECTIONORALIAS);
+
+      sParamsA = mapParams("q", "*:*", "fl", "a_s,a_i,a_f", "sort", "a_s asc", "qt", "/export");
+      stream = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParamsA);
+      Bucket[] buckets1 = {new Bucket("a_s")};
+
+      Metric[] metrics1 = {new SumMetric("a_i"),
+          new SumMetric("a_f"),
+          new MinMetric("a_i"),
+          new MinMetric("a_f"),
+          new MaxMetric("a_i"),
+          new MaxMetric("a_f"),
+          new MeanMetric("a_i"),
+          new MeanMetric("a_f"),
+          new CountMetric()};
+
+      rollupStream = new RollupStream(stream, buckets1, metrics1);
+      rollupStream.setStreamContext(streamContext);
+      tuples = getTuples(rollupStream);
+      //Check that we've got the extra NULL bucket
+      assertEquals(4, tuples.size());
+      tuple = tuples.get(0);
+      assertEquals("NULL", tuple.getString("a_s"));
+
+      sumi = tuple.getDouble("sum(a_i)");
+      sumf = tuple.getDouble("sum(a_f)");
+      mini = tuple.getDouble("min(a_i)");
+      minf = tuple.getDouble("min(a_f)");
+      maxi = tuple.getDouble("max(a_i)");
+      maxf = tuple.getDouble("max(a_f)");
+      avgi = tuple.getDouble("avg(a_i)");
+      avgf = tuple.getDouble("avg(a_f)");
+      count = tuple.getDouble("count(*)");
+
+      assertEquals(14, sumi.doubleValue(), 0.01);
+      assertEquals(10, sumf.doubleValue(), 0.01);
+      assertEquals(14, mini.doubleValue(), 0.01);
+      assertEquals(10, minf.doubleValue(), 0.01);
+      assertEquals(14, maxi.doubleValue(), 0.01);
+      assertEquals(10, maxf.doubleValue(), 0.01);
+      assertEquals(14, avgi.doubleValue(), 0.01);
+      assertEquals(10, avgf.doubleValue(), 0.01);
+      assertEquals(1, count.doubleValue(), 0.01);
+    } finally {
+      solrClientCache.close();
+    }
   }
 
   @Test
@@ -1583,66 +1715,71 @@ public void testTrace() throws Exception {
     SolrClientCache cache = new SolrClientCache();
     context.setSolrClientCache(cache);
 
-    SolrParams sParams = mapParams("q", "a_s:hello0", "rows", "500", "fl", "id");
+    try {
+      SolrParams sParams = mapParams("q", "a_s:hello0", "rows", "500", "fl", "id");
 
-    TopicStream topicStream = new TopicStream(zkHost,
-        COLLECTIONORALIAS,
-        COLLECTIONORALIAS,
-                                              "50000000",
-                                              -1,
-                                              1000000, sParams);
+      TopicStream topicStream = new TopicStream(zkHost,
+          COLLECTIONORALIAS,
+          COLLECTIONORALIAS,
+          "50000000",
+          -1,
+          1000000, sParams);
 
-    DaemonStream daemonStream = new DaemonStream(topicStream, "daemon1", 1000, 500);
-    daemonStream.setStreamContext(context);
+      DaemonStream daemonStream = new DaemonStream(topicStream, "daemon1", 1000, 500);
+      daemonStream.setStreamContext(context);
 
-    daemonStream.open();
+      daemonStream.open();
 
-    // Wait for the checkpoint
-    JettySolrRunner jetty = cluster.getJettySolrRunners().get(0);
+      // Wait for the checkpoint
+      JettySolrRunner jetty = cluster.getJettySolrRunners().get(0);
 
 
-    SolrParams sParams1 = mapParams("qt", "/get", "ids", "50000000", "fl", "id");
-    int count = 0;
-    while(count == 0) {
-      SolrStream solrStream = new SolrStream(jetty.getBaseUrl().toString() + "/" + COLLECTIONORALIAS, sParams1);
-      List<Tuple> tuples = getTuples(solrStream);
-      count = tuples.size();
-      if(count > 0) {
-        Tuple t = tuples.get(0);
-        assertTrue(t.getLong("id") == 50000000);
-      } else {
-        System.out.println("###### Waiting for checkpoint #######:" + count);
+      SolrParams sParams1 = mapParams("qt", "/get", "ids", "50000000", "fl", "id");
+      int count = 0;
+      while (count == 0) {
+        SolrStream solrStream = new SolrStream(jetty.getBaseUrl().toString() + "/" + COLLECTIONORALIAS, sParams1);
+        solrStream.setStreamContext(context);
+        List<Tuple> tuples = getTuples(solrStream);
+        count = tuples.size();
+        if (count > 0) {
+          Tuple t = tuples.get(0);
+          assertTrue(t.getLong("id") == 50000000);
+        } else {
+          System.out.println("###### Waiting for checkpoint #######:" + count);
+        }
       }
-    }
 
-    new UpdateRequest()
-        .add(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "1")
-        .add(id, "2", "a_s", "hello0", "a_i", "2", "a_f", "2")
-        .add(id, "3", "a_s", "hello0", "a_i", "3", "a_f", "3")
-        .add(id, "4", "a_s", "hello0", "a_i", "4", "a_f", "4")
-        .add(id, "1", "a_s", "hello0", "a_i", "1", "a_f", "5")
-        .commit(cluster.getSolrClient(), COLLECTIONORALIAS);
+      new UpdateRequest()
+          .add(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "1")
+          .add(id, "2", "a_s", "hello0", "a_i", "2", "a_f", "2")
+          .add(id, "3", "a_s", "hello0", "a_i", "3", "a_f", "3")
+          .add(id, "4", "a_s", "hello0", "a_i", "4", "a_f", "4")
+          .add(id, "1", "a_s", "hello0", "a_i", "1", "a_f", "5")
+          .commit(cluster.getSolrClient(), COLLECTIONORALIAS);
 
-    for(int i=0; i<5; i++) {
-      daemonStream.read();
-    }
+      for (int i = 0; i < 5; i++) {
+        daemonStream.read();
+      }
 
-    new UpdateRequest()
-        .add(id, "5", "a_s", "hello0", "a_i", "4", "a_f", "4")
-        .add(id, "6", "a_s", "hello0", "a_i", "4", "a_f", "4")
-        .commit(cluster.getSolrClient(), COLLECTIONORALIAS);
+      new UpdateRequest()
+          .add(id, "5", "a_s", "hello0", "a_i", "4", "a_f", "4")
+          .add(id, "6", "a_s", "hello0", "a_i", "4", "a_f", "4")
+          .commit(cluster.getSolrClient(), COLLECTIONORALIAS);
 
-    for(int i=0; i<2; i++) {
-      daemonStream.read();
-    }
+      for (int i = 0; i < 2; i++) {
+        daemonStream.read();
+      }
+
+      daemonStream.shutdown();
 
-    daemonStream.shutdown();
+      Tuple tuple = daemonStream.read();
 
-    Tuple tuple = daemonStream.read();
+      assertTrue(tuple.EOF);
+      daemonStream.close();
+    } finally {
+      cache.close();
+    }
 
-    assertTrue(tuple.EOF);
-    daemonStream.close();
-    cache.close();
 
   }
 
@@ -1662,99 +1799,107 @@ public void testTrace() throws Exception {
         .add(id, "9", "a_s", "hello0", "a_i", "14", "a_f", "10")
         .commit(cluster.getSolrClient(), COLLECTIONORALIAS);
 
-    SolrParams sParamsA = mapParams("q", "*:*", "fl", "a_s,a_i,a_f", "sort", "a_s asc", "partitionKeys", "a_s");
-    CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParamsA);
-
-    Bucket[] buckets =  {new Bucket("a_s")};
-
-    Metric[] metrics = {new SumMetric("a_i"),
-                        new SumMetric("a_f"),
-                        new MinMetric("a_i"),
-                        new MinMetric("a_f"),
-                        new MaxMetric("a_i"),
-                        new MaxMetric("a_f"),
-                        new MeanMetric("a_i"),
-                        new MeanMetric("a_f"),
-                        new CountMetric()};
-
-    RollupStream rollupStream = new RollupStream(stream, buckets, metrics);
-    ParallelStream parallelStream = parallelStream(rollupStream, new FieldComparator("a_s", ComparatorOrder.ASCENDING));
-    attachStreamFactory(parallelStream);
-    List<Tuple> tuples = getTuples(parallelStream);
-
-    assertEquals(3, tuples.size());
-
-    //Test Long and Double Sums
-
-    Tuple tuple = tuples.get(0);
-    String bucket = tuple.getString("a_s");
-    Double sumi = tuple.getDouble("sum(a_i)");
-    Double sumf = tuple.getDouble("sum(a_f)");
-    Double mini = tuple.getDouble("min(a_i)");
-    Double minf = tuple.getDouble("min(a_f)");
-    Double maxi = tuple.getDouble("max(a_i)");
-    Double maxf = tuple.getDouble("max(a_f)");
-    Double avgi = tuple.getDouble("avg(a_i)");
-    Double avgf = tuple.getDouble("avg(a_f)");
-    Double count = tuple.getDouble("count(*)");
-
-    assertEquals("hello0", bucket);
-    assertEquals(17, sumi.doubleValue(), 0.001);
-    assertEquals(18, sumf.doubleValue(), 0.001);
-    assertEquals(0, mini.doubleValue(), 0.001);
-    assertEquals(1, minf.doubleValue(), 0.001);
-    assertEquals(14, maxi.doubleValue(), 0.001);
-    assertEquals(10, maxf.doubleValue(), 0.001);
-    assertEquals(4.25, avgi.doubleValue(), 0.001);
-    assertEquals(4.5, avgf.doubleValue(), 0.001);
-    assertEquals(4, count.doubleValue(), 0.001);
-
-    tuple = tuples.get(1);
-    bucket = tuple.getString("a_s");
-    sumi = tuple.getDouble("sum(a_i)");
-    sumf = tuple.getDouble("sum(a_f)");
-    mini = tuple.getDouble("min(a_i)");
-    minf = tuple.getDouble("min(a_f)");
-    maxi = tuple.getDouble("max(a_i)");
-    maxf = tuple.getDouble("max(a_f)");
-    avgi = tuple.getDouble("avg(a_i)");
-    avgf = tuple.getDouble("avg(a_f)");
-    count = tuple.getDouble("count(*)");
-
-    assertEquals("hello3", bucket);
-    assertEquals(38, sumi.doubleValue(), 0.001);
-    assertEquals(26, sumf.doubleValue(), 0.001);
-    assertEquals(3, mini.doubleValue(), 0.001);
-    assertEquals(3, minf.doubleValue(), 0.001);
-    assertEquals(13, maxi.doubleValue(), 0.001);
-    assertEquals(9, maxf.doubleValue(), 0.001);
-    assertEquals(9.5, avgi.doubleValue(), 0.001);
-    assertEquals(6.5, avgf.doubleValue(), 0.001);
-    assertEquals(4, count.doubleValue(), 0.001);
-
-    tuple = tuples.get(2);
-    bucket = tuple.getString("a_s");
-    sumi = tuple.getDouble("sum(a_i)");
-    sumf = tuple.getDouble("sum(a_f)");
-    mini = tuple.getDouble("min(a_i)");
-    minf = tuple.getDouble("min(a_f)");
-    maxi = tuple.getDouble("max(a_i)");
-    maxf = tuple.getDouble("max(a_f)");
-    avgi = tuple.getDouble("avg(a_i)");
-    avgf = tuple.getDouble("avg(a_f)");
-    count = tuple.getDouble("count(*)");
-
-    assertEquals("hello4", bucket);
-    assertEquals(15, sumi.longValue());
-    assertEquals(11, sumf.doubleValue(), 0.001);
-    assertEq

<TRUNCATED>

[2/4] lucene-solr:master: SOLR-10274: The search Streaming Expression should work in non-SolrCloud mode

Posted by jb...@apache.org.
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/06a55b73/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java
index bb0bd7e..b69195a 100644
--- a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java
+++ b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java
@@ -127,63 +127,119 @@ public class StreamExpressionTest extends SolrCloudTestCase {
     StreamExpression expression;
     CloudSolrStream stream;
     List<Tuple> tuples;
-    
-    // Basic test
-    expression = StreamExpressionParser.parse("search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_i asc\")");
-    stream = new CloudSolrStream(expression, factory);
-    tuples = getTuples(stream);
+    StreamContext streamContext = new StreamContext();
+    SolrClientCache solrClientCache = new SolrClientCache();
+    streamContext.setSolrClientCache(solrClientCache);
 
-    assert(tuples.size() == 5);
-    assertOrder(tuples, 0, 2, 1, 3, 4);
-    assertLong(tuples.get(0), "a_i", 0);
+    try {
+      // Basic test
+      expression = StreamExpressionParser.parse("search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_i asc\")");
+      stream = new CloudSolrStream(expression, factory);
+      stream.setStreamContext(streamContext);
+      tuples = getTuples(stream);
 
-    // Basic w/aliases
-    expression = StreamExpressionParser.parse("search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_i asc\", aliases=\"a_i=alias.a_i, a_s=name\")");
-    stream = new CloudSolrStream(expression, factory);
-    tuples = getTuples(stream);
+      assert (tuples.size() == 5);
+      assertOrder(tuples, 0, 2, 1, 3, 4);
+      assertLong(tuples.get(0), "a_i", 0);
 
-    assert(tuples.size() == 5);
-    assertOrder(tuples, 0, 2, 1, 3, 4);
-    assertLong(tuples.get(0), "alias.a_i", 0);
-    assertString(tuples.get(0), "name", "hello0");
-
-    // Basic filtered test
-    expression = StreamExpressionParser.parse("search(" + COLLECTIONORALIAS + ", q=\"id:(0 3 4)\", fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_i asc\")");
-    stream = new CloudSolrStream(expression, factory);
-    tuples = getTuples(stream);
+      // Basic w/aliases
+      expression = StreamExpressionParser.parse("search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_i asc\", aliases=\"a_i=alias.a_i, a_s=name\")");
+      stream = new CloudSolrStream(expression, factory);
+      stream.setStreamContext(streamContext);
+      tuples = getTuples(stream);
 
-    assert(tuples.size() == 3);
-    assertOrder(tuples, 0, 3, 4);
-    assertLong(tuples.get(1), "a_i", 3);
+      assert (tuples.size() == 5);
+      assertOrder(tuples, 0, 2, 1, 3, 4);
+      assertLong(tuples.get(0), "alias.a_i", 0);
+      assertString(tuples.get(0), "name", "hello0");
 
-    try {
-      expression = StreamExpressionParser.parse("search(" + COLLECTIONORALIAS + ", fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_i asc\")");
+      // Basic filtered test
+      expression = StreamExpressionParser.parse("search(" + COLLECTIONORALIAS + ", q=\"id:(0 3 4)\", fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_i asc\")");
       stream = new CloudSolrStream(expression, factory);
+      stream.setStreamContext(streamContext);
       tuples = getTuples(stream);
-      throw new Exception("Should be an exception here");
-    } catch(Exception e) {
-      assertTrue(e.getMessage().contains("q param expected for search function"));
-    }
 
-    try {
-      expression = StreamExpressionParser.parse("search(" + COLLECTIONORALIAS + ", q=\"blah\", sort=\"a_f asc, a_i asc\")");
+      assert (tuples.size() == 3);
+      assertOrder(tuples, 0, 3, 4);
+      assertLong(tuples.get(1), "a_i", 3);
+
+      try {
+        expression = StreamExpressionParser.parse("search(" + COLLECTIONORALIAS + ", fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_i asc\")");
+        stream = new CloudSolrStream(expression, factory);
+        stream.setStreamContext(streamContext);
+        tuples = getTuples(stream);
+        throw new Exception("Should be an exception here");
+      } catch (Exception e) {
+        assertTrue(e.getMessage().contains("q param expected for search function"));
+      }
+
+      try {
+        expression = StreamExpressionParser.parse("search(" + COLLECTIONORALIAS + ", q=\"blah\", sort=\"a_f asc, a_i asc\")");
+        stream = new CloudSolrStream(expression, factory);
+        stream.setStreamContext(streamContext);
+        tuples = getTuples(stream);
+        throw new Exception("Should be an exception here");
+      } catch (Exception e) {
+        assertTrue(e.getMessage().contains("fl param expected for search function"));
+      }
+
+      try {
+        expression = StreamExpressionParser.parse("search(" + COLLECTIONORALIAS + ", q=\"blah\", fl=\"id, a_f\", sort=\"a_f\")");
+        stream = new CloudSolrStream(expression, factory);
+        stream.setStreamContext(streamContext);
+        tuples = getTuples(stream);
+        throw new Exception("Should be an exception here");
+      } catch (Exception e) {
+        assertTrue(e.getMessage().contains("Invalid sort spec"));
+      }
+
+      // Test with shards param
+
+      List<String> shardUrls = TupleStream.getShards(cluster.getZkServer().getZkAddress(), COLLECTIONORALIAS, streamContext);
+
+      Map<String, List<String>> shardsMap = new HashMap();
+      shardsMap.put("myCollection", shardUrls);
+      StreamContext context = new StreamContext();
+      context.put("shards", shardsMap);
+      context.setSolrClientCache(solrClientCache);
+
+      // Basic test
+      expression = StreamExpressionParser.parse("search(myCollection, q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_i asc\")");
       stream = new CloudSolrStream(expression, factory);
+      stream.setStreamContext(context);
       tuples = getTuples(stream);
-      throw new Exception("Should be an exception here");
-    } catch(Exception e) {
-      assertTrue(e.getMessage().contains("fl param expected for search function"));
-    }
 
+      assert (tuples.size() == 5);
+      assertOrder(tuples, 0, 2, 1, 3, 4);
+      assertLong(tuples.get(0), "a_i", 0);
 
-    try {
-      expression = StreamExpressionParser.parse("search(" + COLLECTIONORALIAS + ", q=\"blah\", fl=\"id, a_f\", sort=\"a_f\")");
-      stream = new CloudSolrStream(expression, factory);
+
+      //Execersise the /stream hander
+
+      //Add the shards http parameter for the myCollection
+      StringBuilder buf = new StringBuilder();
+      for (String shardUrl : shardUrls) {
+        if (buf.length() > 0) {
+          buf.append(",");
+        }
+        buf.append(shardUrl);
+      }
+
+      ModifiableSolrParams solrParams = new ModifiableSolrParams();
+      solrParams.add("qt", "/stream");
+      solrParams.add("expr", "search(myCollection, q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_i asc\")");
+      solrParams.add("myCollection.shards", buf.toString());
+      SolrStream solrStream = new SolrStream(shardUrls.get(0), solrParams);
+      stream.setStreamContext(context);
       tuples = getTuples(stream);
-      throw new Exception("Should be an exception here");
-    } catch(Exception e) {
-      assertTrue(e.getMessage().contains("Invalid sort spec"));
-    }
 
+      assert (tuples.size() == 5);
+      assertOrder(tuples, 0, 2, 1, 3, 4);
+      assertLong(tuples.get(0), "a_i", 0);
+
+    } finally {
+      solrClientCache.close();
+    }
   }
 
   @Test
@@ -200,55 +256,66 @@ public class StreamExpressionTest extends SolrCloudTestCase {
     StreamFactory factory = new StreamFactory();
     StreamExpression expression;
     CloudSolrStream stream;
+    StreamContext streamContext = new StreamContext();
+    SolrClientCache solrClientCache = new SolrClientCache();
+    streamContext.setSolrClientCache(solrClientCache);
     List<Tuple> tuples;
-    
-    // Basic test
-    expression = StreamExpressionParser.parse("search(" + COLLECTIONORALIAS + ", zkHost=" + cluster.getZkServer().getZkAddress() + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_i asc\")");
-    stream = new CloudSolrStream(expression, factory);
-    tuples = getTuples(stream);
 
-    assert(tuples.size() == 5);
-    assertOrder(tuples, 0, 2, 1, 3, 4);
-    assertLong(tuples.get(0), "a_i", 0);
+    try {
+      // Basic test
+      expression = StreamExpressionParser.parse("search(" + COLLECTIONORALIAS + ", zkHost=" + cluster.getZkServer().getZkAddress() + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_i asc\")");
+      stream = new CloudSolrStream(expression, factory);
+      stream.setStreamContext(streamContext);
+      tuples = getTuples(stream);
 
-    // Basic w/aliases
-    expression = StreamExpressionParser.parse("search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_i asc\", aliases=\"a_i=alias.a_i, a_s=name\", zkHost=" + cluster.getZkServer().getZkAddress() + ")");
-    stream = new CloudSolrStream(expression, factory);
-    tuples = getTuples(stream);
+      assert (tuples.size() == 5);
+      assertOrder(tuples, 0, 2, 1, 3, 4);
+      assertLong(tuples.get(0), "a_i", 0);
 
-    assert(tuples.size() == 5);
-    assertOrder(tuples, 0, 2, 1, 3, 4);
-    assertLong(tuples.get(0), "alias.a_i", 0);
-    assertString(tuples.get(0), "name", "hello0");
+      // Basic w/aliases
+      expression = StreamExpressionParser.parse("search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_i asc\", aliases=\"a_i=alias.a_i, a_s=name\", zkHost=" + cluster.getZkServer().getZkAddress() + ")");
+      stream = new CloudSolrStream(expression, factory);
+      stream.setStreamContext(streamContext);
+      tuples = getTuples(stream);
 
-    // Basic filtered test
-    expression = StreamExpressionParser.parse("search(" + COLLECTIONORALIAS + ", q=\"id:(0 3 4)\", fl=\"id,a_s,a_i,a_f\", zkHost="
-        + cluster.getZkServer().getZkAddress() + ", sort=\"a_f asc, a_i asc\")");
-    stream = new CloudSolrStream(expression, factory);
-    tuples = getTuples(stream);
+      assert (tuples.size() == 5);
+      assertOrder(tuples, 0, 2, 1, 3, 4);
+      assertLong(tuples.get(0), "alias.a_i", 0);
+      assertString(tuples.get(0), "name", "hello0");
 
-    assert(tuples.size() == 3);
-    assertOrder(tuples, 0, 3, 4);
-    assertLong(tuples.get(1), "a_i", 3);
+      // Basic filtered test
+      expression = StreamExpressionParser.parse("search(" + COLLECTIONORALIAS + ", q=\"id:(0 3 4)\", fl=\"id,a_s,a_i,a_f\", zkHost="
+          + cluster.getZkServer().getZkAddress() + ", sort=\"a_f asc, a_i asc\")");
+      stream = new CloudSolrStream(expression, factory);
+      stream.setStreamContext(streamContext);
+      tuples = getTuples(stream);
 
+      assert (tuples.size() == 3);
+      assertOrder(tuples, 0, 3, 4);
+      assertLong(tuples.get(1), "a_i", 3);
 
-    // Test a couple of multile field lists.
-    expression = StreamExpressionParser.parse("search(collection1, fq=\"a_s:hello0\", fq=\"a_s:hello1\", q=\"id:(*)\", " +
-        "zkHost=" + cluster.getZkServer().getZkAddress()+ ", fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_i asc\")");
-    stream = new CloudSolrStream(expression, factory);
-    tuples = getTuples(stream);
 
-    assertEquals("fq clauses should have prevented any docs from coming back", tuples.size(), 0);
+      // Test a couple of multile field lists.
+      expression = StreamExpressionParser.parse("search(collection1, fq=\"a_s:hello0\", fq=\"a_s:hello1\", q=\"id:(*)\", " +
+          "zkHost=" + cluster.getZkServer().getZkAddress() + ", fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_i asc\")");
+      stream = new CloudSolrStream(expression, factory);
+      stream.setStreamContext(streamContext);
+      tuples = getTuples(stream);
 
+      assertEquals("fq clauses should have prevented any docs from coming back", tuples.size(), 0);
 
-    expression = StreamExpressionParser.parse("search(collection1, fq=\"a_s:(hello0 OR hello1)\", q=\"id:(*)\", " +
-        "zkHost=" + cluster.getZkServer().getZkAddress() + ", fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_i asc\")");
-    stream = new CloudSolrStream(expression, factory);
-    tuples = getTuples(stream);
 
-    assertEquals("Combining an f1 clause should show us 2 docs", tuples.size(), 2);
-    
-        
+      expression = StreamExpressionParser.parse("search(collection1, fq=\"a_s:(hello0 OR hello1)\", q=\"id:(*)\", " +
+          "zkHost=" + cluster.getZkServer().getZkAddress() + ", fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_i asc\")");
+      stream = new CloudSolrStream(expression, factory);
+      stream.setStreamContext(streamContext);
+      tuples = getTuples(stream);
+
+      assertEquals("Combining an f1 clause should show us 2 docs", tuples.size(), 2);
+
+    } finally {
+      solrClientCache.close();
+    }
 
   }
 
@@ -315,43 +382,53 @@ public class StreamExpressionTest extends SolrCloudTestCase {
     StreamExpression expression;
     TupleStream stream;
     List<Tuple> tuples;
-    
+    StreamContext streamContext = new StreamContext();
+    SolrClientCache solrClientCache = new SolrClientCache();
+    streamContext.setSolrClientCache(solrClientCache);
+
     StreamFactory factory = new StreamFactory()
       .withCollectionZkHost(COLLECTIONORALIAS, cluster.getZkServer().getZkAddress())
       .withFunctionName("search", CloudSolrStream.class)
       .withFunctionName("unique", UniqueStream.class);
-    
-    // Basic test
-    expression = StreamExpressionParser.parse("unique(search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_i asc\"), over=\"a_f\")");
-    stream = new UniqueStream(expression, factory);
-    tuples = getTuples(stream);
-    
-    assert(tuples.size() == 4);
-    assertOrder(tuples, 0, 1, 3, 4);
 
-    // Basic test desc
-    expression = StreamExpressionParser.parse("unique(search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f desc, a_i desc\"), over=\"a_f\")");
-    stream = new UniqueStream(expression, factory);
-    tuples = getTuples(stream);
-    
-    assert(tuples.size() == 4);
-    assertOrder(tuples, 4, 3, 1, 2);
-    
-    // Basic w/multi comp
-    expression = StreamExpressionParser.parse("unique(search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_i asc\"), over=\"a_f, a_i\")");
-    stream = new UniqueStream(expression, factory);
-    tuples = getTuples(stream);
-    
-    assert(tuples.size() == 5);
-    assertOrder(tuples, 0,2,1,3,4);
-    
-    // full factory w/multi comp
-    stream = factory.constructStream("unique(search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_i asc\"), over=\"a_f, a_i\")");
-    tuples = getTuples(stream);
-    
-    assert(tuples.size() == 5);
-    assertOrder(tuples, 0, 2, 1, 3, 4);
+    try {
+      // Basic test
+      expression = StreamExpressionParser.parse("unique(search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_i asc\"), over=\"a_f\")");
+      stream = new UniqueStream(expression, factory);
+      stream.setStreamContext(streamContext);
+      tuples = getTuples(stream);
 
+      assert (tuples.size() == 4);
+      assertOrder(tuples, 0, 1, 3, 4);
+
+      // Basic test desc
+      expression = StreamExpressionParser.parse("unique(search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f desc, a_i desc\"), over=\"a_f\")");
+      stream = new UniqueStream(expression, factory);
+      stream.setStreamContext(streamContext);
+      tuples = getTuples(stream);
+
+      assert (tuples.size() == 4);
+      assertOrder(tuples, 4, 3, 1, 2);
+
+      // Basic w/multi comp
+      expression = StreamExpressionParser.parse("unique(search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_i asc\"), over=\"a_f, a_i\")");
+      stream = new UniqueStream(expression, factory);
+      stream.setStreamContext(streamContext);
+      tuples = getTuples(stream);
+
+      assert (tuples.size() == 5);
+      assertOrder(tuples, 0, 2, 1, 3, 4);
+
+      // full factory w/multi comp
+      stream = factory.constructStream("unique(search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_i asc\"), over=\"a_f, a_i\")");
+      stream.setStreamContext(streamContext);
+      tuples = getTuples(stream);
+
+      assert (tuples.size() == 5);
+      assertOrder(tuples, 0, 2, 1, 3, 4);
+    } finally {
+      solrClientCache.close();
+    }
   }
 
   @Test
@@ -369,30 +446,38 @@ public class StreamExpressionTest extends SolrCloudTestCase {
     StreamExpression expression;
     TupleStream stream;
     List<Tuple> tuples;
-    
-    StreamFactory factory = new StreamFactory()
-      .withCollectionZkHost(COLLECTIONORALIAS, cluster.getZkServer().getZkAddress())
-      .withFunctionName("search", CloudSolrStream.class)
-      .withFunctionName("sort", SortStream.class);
-    
-    // Basic test
-    stream = factory.constructStream("sort(search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc\"), by=\"a_i asc\")");
-    tuples = getTuples(stream);
-    assert(tuples.size() == 6);
-    assertOrder(tuples, 0, 1, 5, 2, 3, 4);
+    StreamContext streamContext = new StreamContext();
+    SolrClientCache solrClientCache = new SolrClientCache();
+    streamContext.setSolrClientCache(solrClientCache);
+    try {
+      StreamFactory factory = new StreamFactory()
+          .withCollectionZkHost(COLLECTIONORALIAS, cluster.getZkServer().getZkAddress())
+          .withFunctionName("search", CloudSolrStream.class)
+          .withFunctionName("sort", SortStream.class);
 
-    // Basic test desc
-    stream = factory.constructStream("sort(search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc\"), by=\"a_i desc\")");
-    tuples = getTuples(stream);
-    assert(tuples.size() == 6);
-    assertOrder(tuples, 4,3,2,1,5,0);
-    
-    // Basic w/multi comp
-    stream = factory.constructStream("sort(search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc\"), by=\"a_i asc, a_f desc\")");
-    tuples = getTuples(stream);
-    assert(tuples.size() == 6);
-    assertOrder(tuples, 0,5,1,2,3,4);
+      // Basic test
+      stream = factory.constructStream("sort(search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc\"), by=\"a_i asc\")");
+      stream.setStreamContext(streamContext);
+      tuples = getTuples(stream);
+      assert (tuples.size() == 6);
+      assertOrder(tuples, 0, 1, 5, 2, 3, 4);
 
+      // Basic test desc
+      stream = factory.constructStream("sort(search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc\"), by=\"a_i desc\")");
+      stream.setStreamContext(streamContext);
+      tuples = getTuples(stream);
+      assert (tuples.size() == 6);
+      assertOrder(tuples, 4, 3, 2, 1, 5, 0);
+
+      // Basic w/multi comp
+      stream = factory.constructStream("sort(search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc\"), by=\"a_i asc, a_f desc\")");
+      stream.setStreamContext(streamContext);
+      tuples = getTuples(stream);
+      assert (tuples.size() == 6);
+      assertOrder(tuples, 0, 5, 1, 2, 3, 4);
+    } finally {
+      solrClientCache.close();
+    }
   }
 
 
@@ -411,17 +496,24 @@ public class StreamExpressionTest extends SolrCloudTestCase {
     StreamExpression expression;
     TupleStream stream;
     List<Tuple> tuples;
-
+    StreamContext streamContext = new StreamContext();
+    SolrClientCache solrClientCache = new SolrClientCache();
+    streamContext.setSolrClientCache(solrClientCache);
     StreamFactory factory = new StreamFactory()
         .withCollectionZkHost(COLLECTIONORALIAS, cluster.getZkServer().getZkAddress())
         .withFunctionName("search", CloudSolrStream.class)
         .withFunctionName("null", NullStream.class);
 
-    // Basic test
-    stream = factory.constructStream("null(search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc\"), by=\"a_i asc\")");
-    tuples = getTuples(stream);
-    assertTrue(tuples.size() == 1);
-    assertTrue(tuples.get(0).getLong("nullCount") == 6);
+    try {
+      // Basic test
+      stream = factory.constructStream("null(search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc\"), by=\"a_i asc\")");
+      stream.setStreamContext(streamContext);
+      tuples = getTuples(stream);
+      assertTrue(tuples.size() == 1);
+      assertTrue(tuples.get(0).getLong("nullCount") == 6);
+    } finally {
+      solrClientCache.close();
+    }
   }
 
 
@@ -440,6 +532,9 @@ public class StreamExpressionTest extends SolrCloudTestCase {
     StreamExpression expression;
     TupleStream stream;
     List<Tuple> tuples;
+    StreamContext streamContext = new StreamContext();
+    SolrClientCache solrClientCache = new SolrClientCache();
+    streamContext.setSolrClientCache(solrClientCache);
 
     StreamFactory factory = new StreamFactory()
         .withCollectionZkHost(COLLECTIONORALIAS, cluster.getZkServer().getZkAddress())
@@ -447,18 +542,23 @@ public class StreamExpressionTest extends SolrCloudTestCase {
         .withFunctionName("null", NullStream.class)
         .withFunctionName("parallel", ParallelStream.class);
 
-    // Basic test
-    stream = factory.constructStream("parallel(" + COLLECTIONORALIAS + ", workers=2, sort=\"nullCount desc\", null(search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc\", partitionKeys=id), by=\"a_i asc\"))");
-    tuples = getTuples(stream);
-    assertTrue(tuples.size() == 2);
-    long nullCount = 0;
-    for(Tuple t : tuples) {
-      nullCount += t.getLong("nullCount");
-    }
+    try {
 
-    assertEquals(nullCount, 6L);
-  }
+      // Basic test
+      stream = factory.constructStream("parallel(" + COLLECTIONORALIAS + ", workers=2, sort=\"nullCount desc\", null(search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc\", partitionKeys=id), by=\"a_i asc\"))");
+      stream.setStreamContext(streamContext);
+      tuples = getTuples(stream);
+      assertTrue(tuples.size() == 2);
+      long nullCount = 0;
+      for (Tuple t : tuples) {
+        nullCount += t.getLong("nullCount");
+      }
 
+      assertEquals(nullCount, 6L);
+    } finally {
+      solrClientCache.close();
+    }
+  }
 
   @Test
   public void testNulls() throws Exception {
@@ -475,49 +575,59 @@ public class StreamExpressionTest extends SolrCloudTestCase {
     TupleStream stream;
     List<Tuple> tuples;
     Tuple tuple;
+    StreamContext streamContext = new StreamContext();
+    SolrClientCache solrClientCache = new SolrClientCache();
+    streamContext.setSolrClientCache(solrClientCache);
+
     StreamFactory factory = new StreamFactory()
         .withCollectionZkHost(COLLECTIONORALIAS, cluster.getZkServer().getZkAddress())
         .withFunctionName("search", CloudSolrStream.class);
-    // Basic test
-    expression = StreamExpressionParser.parse("search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"id,a_s,a_i,a_f, s_multi, i_multi\", qt=\"/export\", sort=\"a_i asc\")");
-    stream = new CloudSolrStream(expression, factory);
-    tuples = getTuples(stream);
+    try {
+      // Basic test
+      expression = StreamExpressionParser.parse("search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"id,a_s,a_i,a_f, s_multi, i_multi\", qt=\"/export\", sort=\"a_i asc\")");
+      stream = new CloudSolrStream(expression, factory);
+      stream.setStreamContext(streamContext);
+      tuples = getTuples(stream);
 
-    assert(tuples.size() == 5);
-    assertOrder(tuples, 4, 0, 1, 2, 3);
+      assert (tuples.size() == 5);
+      assertOrder(tuples, 4, 0, 1, 2, 3);
 
-    tuple = tuples.get(0);
-    assertTrue("hello4".equals(tuple.getString("a_s")));
-    assertNull(tuple.get("s_multi"));
-    assertNull(tuple.get("i_multi"));
-    assertNull(tuple.getLong("a_i"));
+      tuple = tuples.get(0);
+      assertTrue("hello4".equals(tuple.getString("a_s")));
+      assertNull(tuple.get("s_multi"));
+      assertNull(tuple.get("i_multi"));
+      assertNull(tuple.getLong("a_i"));
 
 
-    tuple = tuples.get(1);
-    assertNull(tuple.get("a_s"));
-    List<String> strings = tuple.getStrings("s_multi");
-    assertNotNull(strings);
-    assertEquals("aaa", strings.get(0));
-    assertEquals("bbb", strings.get(1));
-    List<Long> longs = tuple.getLongs("i_multi");
-    assertNotNull(longs);
-
-    //test sort (asc) with null string field. Null should sort to the top.
-    expression = StreamExpressionParser.parse("search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"id,a_s,a_i,a_f, s_multi, i_multi\", qt=\"/export\", sort=\"a_s asc\")");
-    stream = new CloudSolrStream(expression, factory);
-    tuples = getTuples(stream);
+      tuple = tuples.get(1);
+      assertNull(tuple.get("a_s"));
+      List<String> strings = tuple.getStrings("s_multi");
+      assertNotNull(strings);
+      assertEquals("aaa", strings.get(0));
+      assertEquals("bbb", strings.get(1));
+      List<Long> longs = tuple.getLongs("i_multi");
+      assertNotNull(longs);
 
-    assert(tuples.size() == 5);
-    assertOrder(tuples, 0, 1, 2, 3, 4);
+      //test sort (asc) with null string field. Null should sort to the top.
+      expression = StreamExpressionParser.parse("search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"id,a_s,a_i,a_f, s_multi, i_multi\", qt=\"/export\", sort=\"a_s asc\")");
+      stream = new CloudSolrStream(expression, factory);
+      stream.setStreamContext(streamContext);
+      tuples = getTuples(stream);
 
-    //test sort(desc) with null string field.  Null should sort to the bottom.
-    expression = StreamExpressionParser.parse("search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"id,a_s,a_i,a_f, s_multi, i_multi\", qt=\"/export\", sort=\"a_s desc\")");
-    stream = new CloudSolrStream(expression, factory);
-    tuples = getTuples(stream);
+      assert (tuples.size() == 5);
+      assertOrder(tuples, 0, 1, 2, 3, 4);
 
-    assert(tuples.size() == 5);
-    assertOrder(tuples, 4, 3, 2, 1, 0);
+      //test sort(desc) with null string field.  Null should sort to the bottom.
+      expression = StreamExpressionParser.parse("search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"id,a_s,a_i,a_f, s_multi, i_multi\", qt=\"/export\", sort=\"a_s desc\")");
+      stream = new CloudSolrStream(expression, factory);
+      stream.setStreamContext(streamContext);
+      tuples = getTuples(stream);
 
+      assert (tuples.size() == 5);
+      assertOrder(tuples, 4, 3, 2, 1, 0);
+    } finally {
+      solrClientCache.close();
+    }
   }
 
   @Test
@@ -546,55 +656,67 @@ public class StreamExpressionTest extends SolrCloudTestCase {
         + "search(" + COLLECTIONORALIAS + ", q=\"id:(0 3 4)\", fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc\"),"
         + "search(" + COLLECTIONORALIAS + ", q=\"id:(1)\", fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc\"),"
         + "on=\"a_f asc\")");
-    stream = new MergeStream(expression, factory);
-    tuples = getTuples(stream);
-    
-    assert(tuples.size() == 4);
-    assertOrder(tuples, 0, 1, 3, 4);
 
-    // Basic test desc
-    expression = StreamExpressionParser.parse("merge("
-        + "search(" + COLLECTIONORALIAS + ", q=\"id:(0 3 4)\", fl=\"id,a_s,a_i,a_f\", sort=\"a_f desc\"),"
-        + "search(" + COLLECTIONORALIAS + ", q=\"id:(1)\", fl=\"id,a_s,a_i,a_f\", sort=\"a_f desc\"),"
-        + "on=\"a_f desc\")");
     stream = new MergeStream(expression, factory);
-    tuples = getTuples(stream);
-    
-    assert(tuples.size() == 4);
-    assertOrder(tuples, 4, 3, 1, 0);
-    
-    // Basic w/multi comp
-    expression = StreamExpressionParser.parse("merge("
-        + "search(" + COLLECTIONORALIAS + ", q=\"id:(0 3 4)\", fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_s asc\"),"
-        + "search(" + COLLECTIONORALIAS + ", q=\"id:(1 2)\", fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_s asc\"),"
-        + "on=\"a_f asc, a_s asc\")");
-    stream = new MergeStream(expression, factory);
-    tuples = getTuples(stream);
-    
-    assert(tuples.size() == 5);
-    assertOrder(tuples, 0, 2, 1, 3, 4);
-    
-    // full factory w/multi comp
-    stream = factory.constructStream("merge("
-        + "search(" + COLLECTIONORALIAS + ", q=\"id:(0 3 4)\", fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_s asc\"),"
-        + "search(" + COLLECTIONORALIAS + ", q=\"id:(1 2)\", fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_s asc\"),"
-        + "on=\"a_f asc, a_s asc\")");
-    tuples = getTuples(stream);
-    
-    assert(tuples.size() == 5);
-    assertOrder(tuples, 0, 2, 1, 3, 4);
-    
-    // full factory w/multi streams
-    stream = factory.constructStream("merge("
-        + "search(" + COLLECTIONORALIAS + ", q=\"id:(0 4)\", fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_s asc\"),"
-        + "search(" + COLLECTIONORALIAS + ", q=\"id:(1)\", fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_s asc\"),"
-        + "search(" + COLLECTIONORALIAS + ", q=\"id:(2)\", fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_s asc\"),"
-        + "on=\"a_f asc\")");
-    tuples = getTuples(stream);
-    
-    assert(tuples.size() == 4);
-    assertOrder(tuples, 0, 2, 1, 4);
+    StreamContext streamContext = new StreamContext();
+    SolrClientCache solrClientCache = new SolrClientCache();
+    streamContext.setSolrClientCache(solrClientCache);
+    try {
+      stream.setStreamContext(streamContext);
+      tuples = getTuples(stream);
+
+      assert (tuples.size() == 4);
+      assertOrder(tuples, 0, 1, 3, 4);
+
+      // Basic test desc
+      expression = StreamExpressionParser.parse("merge("
+          + "search(" + COLLECTIONORALIAS + ", q=\"id:(0 3 4)\", fl=\"id,a_s,a_i,a_f\", sort=\"a_f desc\"),"
+          + "search(" + COLLECTIONORALIAS + ", q=\"id:(1)\", fl=\"id,a_s,a_i,a_f\", sort=\"a_f desc\"),"
+          + "on=\"a_f desc\")");
+      stream = new MergeStream(expression, factory);
+      stream.setStreamContext(streamContext);
+      tuples = getTuples(stream);
+
+      assert (tuples.size() == 4);
+      assertOrder(tuples, 4, 3, 1, 0);
+
+      // Basic w/multi comp
+      expression = StreamExpressionParser.parse("merge("
+          + "search(" + COLLECTIONORALIAS + ", q=\"id:(0 3 4)\", fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_s asc\"),"
+          + "search(" + COLLECTIONORALIAS + ", q=\"id:(1 2)\", fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_s asc\"),"
+          + "on=\"a_f asc, a_s asc\")");
+      stream = new MergeStream(expression, factory);
+      stream.setStreamContext(streamContext);
+      tuples = getTuples(stream);
+
+      assert (tuples.size() == 5);
+      assertOrder(tuples, 0, 2, 1, 3, 4);
+
+      // full factory w/multi comp
+      stream = factory.constructStream("merge("
+          + "search(" + COLLECTIONORALIAS + ", q=\"id:(0 3 4)\", fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_s asc\"),"
+          + "search(" + COLLECTIONORALIAS + ", q=\"id:(1 2)\", fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_s asc\"),"
+          + "on=\"a_f asc, a_s asc\")");
+      stream.setStreamContext(streamContext);
+      tuples = getTuples(stream);
+
+      assert (tuples.size() == 5);
+      assertOrder(tuples, 0, 2, 1, 3, 4);
+
+      // full factory w/multi streams
+      stream = factory.constructStream("merge("
+          + "search(" + COLLECTIONORALIAS + ", q=\"id:(0 4)\", fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_s asc\"),"
+          + "search(" + COLLECTIONORALIAS + ", q=\"id:(1)\", fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_s asc\"),"
+          + "search(" + COLLECTIONORALIAS + ", q=\"id:(2)\", fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_s asc\"),"
+          + "on=\"a_f asc\")");
+      stream.setStreamContext(streamContext);
+      tuples = getTuples(stream);
 
+      assert (tuples.size() == 4);
+      assertOrder(tuples, 0, 2, 1, 4);
+    } finally {
+      solrClientCache.close();
+    }
   }
 
   @Test
@@ -611,61 +733,70 @@ public class StreamExpressionTest extends SolrCloudTestCase {
     StreamExpression expression;
     TupleStream stream;
     List<Tuple> tuples;
-    
+    StreamContext streamContext = new StreamContext();
+    SolrClientCache solrClientCache = new SolrClientCache();
+    streamContext.setSolrClientCache(solrClientCache);
+
     StreamFactory factory = new StreamFactory()
       .withCollectionZkHost(COLLECTIONORALIAS, cluster.getZkServer().getZkAddress())
       .withFunctionName("search", CloudSolrStream.class)
       .withFunctionName("unique", UniqueStream.class)
       .withFunctionName("top", RankStream.class);
-    
-    // Basic test
-    expression = StreamExpressionParser.parse("top("
-        + "n=3,"
-        + "search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_i asc\"),"
-        + "sort=\"a_f asc, a_i asc\")");
-    stream = new RankStream(expression, factory);
-    tuples = getTuples(stream);
-    
-    assert(tuples.size() == 3);
-    assertOrder(tuples, 0, 2, 1);
+    try {
+      // Basic test
+      expression = StreamExpressionParser.parse("top("
+          + "n=3,"
+          + "search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_i asc\"),"
+          + "sort=\"a_f asc, a_i asc\")");
+      stream = new RankStream(expression, factory);
+      stream.setStreamContext(streamContext);
+      tuples = getTuples(stream);
 
-    // Basic test desc
-    expression = StreamExpressionParser.parse("top("
-        + "n=2,"
-        + "unique("
-        + "search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f desc\"),"
-        + "over=\"a_f\"),"
-        + "sort=\"a_f desc\")");
-    stream = new RankStream(expression, factory);
-    tuples = getTuples(stream);
-    
-    assert(tuples.size() == 2);
-    assertOrder(tuples, 4, 3);
-    
-    // full factory
-    stream = factory.constructStream("top("
-        + "n=4,"
-        + "unique("
-        + "search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_i asc\"),"
-        + "over=\"a_f\"),"
-        + "sort=\"a_f asc\")");
-    tuples = getTuples(stream);
-    
-    assert(tuples.size() == 4);
-    assertOrder(tuples, 0,1,3,4);
+      assert (tuples.size() == 3);
+      assertOrder(tuples, 0, 2, 1);
+
+      // Basic test desc
+      expression = StreamExpressionParser.parse("top("
+          + "n=2,"
+          + "unique("
+          + "search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f desc\"),"
+          + "over=\"a_f\"),"
+          + "sort=\"a_f desc\")");
+      stream = new RankStream(expression, factory);
+      stream.setStreamContext(streamContext);
+      tuples = getTuples(stream);
 
-    // full factory, switch order
-    stream = factory.constructStream("top("
-            + "n=4,"
-            + "unique("
-            +   "search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f desc, a_i desc\"),"
-            +   "over=\"a_f\"),"
-            + "sort=\"a_f asc\")");
-    tuples = getTuples(stream);
-    
-    assert(tuples.size() == 4);
-    assertOrder(tuples, 2,1,3,4);
+      assert (tuples.size() == 2);
+      assertOrder(tuples, 4, 3);
+
+      // full factory
+      stream = factory.constructStream("top("
+          + "n=4,"
+          + "unique("
+          + "search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_i asc\"),"
+          + "over=\"a_f\"),"
+          + "sort=\"a_f asc\")");
+      stream.setStreamContext(streamContext);
+      tuples = getTuples(stream);
+
+      assert (tuples.size() == 4);
+      assertOrder(tuples, 0, 1, 3, 4);
+
+      // full factory, switch order
+      stream = factory.constructStream("top("
+          + "n=4,"
+          + "unique("
+          + "search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f desc, a_i desc\"),"
+          + "over=\"a_f\"),"
+          + "sort=\"a_f asc\")");
+      stream.setStreamContext(streamContext);
+      tuples = getTuples(stream);
 
+      assert (tuples.size() == 4);
+      assertOrder(tuples, 2, 1, 3, 4);
+    } finally {
+      solrClientCache.close();
+    }
   }
 
   @Test
@@ -735,7 +866,7 @@ public class StreamExpressionTest extends SolrCloudTestCase {
 
       //Exercise the /stream handler
       ModifiableSolrParams sParams = new ModifiableSolrParams(StreamingTest.mapParams(CommonParams.QT, "/stream"));
-      sParams.add("expr", "random(" + COLLECTIONORALIAS + ", q=\"*:*\", rows=\"1\", fl=\"id, a_i\")" );
+      sParams.add("expr", "random(" + COLLECTIONORALIAS + ", q=\"*:*\", rows=\"1\", fl=\"id, a_i\")");
       JettySolrRunner jetty = cluster.getJettySolrRunner(0);
       SolrStream solrStream = new SolrStream(jetty.getBaseUrl().toString() + "/collection1", sParams);
       List<Tuple> tuples4 = getTuples(solrStream);
@@ -767,61 +898,69 @@ public class StreamExpressionTest extends SolrCloudTestCase {
     List<Tuple> tuples;
     Tuple t0, t1, t2;
     List<Map> maps0, maps1, maps2;
-    
+    StreamContext streamContext = new StreamContext();
+    SolrClientCache solrClientCache = new SolrClientCache();
+    streamContext.setSolrClientCache(solrClientCache);
+
     StreamFactory factory = new StreamFactory()
         .withCollectionZkHost(COLLECTIONORALIAS, cluster.getZkServer().getZkAddress())
         .withFunctionName("search", CloudSolrStream.class)
         .withFunctionName("reduce", ReducerStream.class)
         .withFunctionName("group", GroupOperation.class);
 
-    // basic
-    expression = StreamExpressionParser.parse("reduce("
-        + "search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_s asc, a_f asc\"),"
-        + "by=\"a_s\","
-        + "group(sort=\"a_f desc\", n=\"4\"))");
+    try {
+      // basic
+      expression = StreamExpressionParser.parse("reduce("
+          + "search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_s asc, a_f asc\"),"
+          + "by=\"a_s\","
+          + "group(sort=\"a_f desc\", n=\"4\"))");
 
-    stream = factory.constructStream(expression);
-    tuples = getTuples(stream);
+      stream = factory.constructStream(expression);
+      stream.setStreamContext(streamContext);
+      tuples = getTuples(stream);
 
-    assert(tuples.size() == 3);
+      assert (tuples.size() == 3);
 
-    t0 = tuples.get(0);
-    maps0 = t0.getMaps("group");
-    assertMaps(maps0, 9, 1, 2, 0);
+      t0 = tuples.get(0);
+      maps0 = t0.getMaps("group");
+      assertMaps(maps0, 9, 1, 2, 0);
 
-    t1 = tuples.get(1);
-    maps1 = t1.getMaps("group");
-    assertMaps(maps1, 8, 7, 5, 3);
+      t1 = tuples.get(1);
+      maps1 = t1.getMaps("group");
+      assertMaps(maps1, 8, 7, 5, 3);
 
 
-    t2 = tuples.get(2);
-    maps2 = t2.getMaps("group");
-    assertMaps(maps2, 6, 4);
-    
-    // basic w/spaces
-    expression = StreamExpressionParser.parse("reduce("
-        + "search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_s asc, a_f       asc\"),"
-        + "by=\"a_s\"," +
-        "group(sort=\"a_i asc\", n=\"2\"))");
-    stream = factory.constructStream(expression);
-    tuples = getTuples(stream);
+      t2 = tuples.get(2);
+      maps2 = t2.getMaps("group");
+      assertMaps(maps2, 6, 4);
 
-    assert(tuples.size() == 3);
+      // basic w/spaces
+      expression = StreamExpressionParser.parse("reduce("
+          + "search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_s asc, a_f       asc\"),"
+          + "by=\"a_s\"," +
+          "group(sort=\"a_i asc\", n=\"2\"))");
+      stream = factory.constructStream(expression);
+      stream.setStreamContext(streamContext);
+      tuples = getTuples(stream);
 
-    t0 = tuples.get(0);
-    maps0 = t0.getMaps("group");
-    assert(maps0.size() == 2);
+      assert (tuples.size() == 3);
 
-    assertMaps(maps0, 0, 1);
+      t0 = tuples.get(0);
+      maps0 = t0.getMaps("group");
+      assert (maps0.size() == 2);
 
-    t1 = tuples.get(1);
-    maps1 = t1.getMaps("group");
-    assertMaps(maps1, 3, 5);
+      assertMaps(maps0, 0, 1);
 
-    t2 = tuples.get(2);
-    maps2 = t2.getMaps("group");
-    assertMaps(maps2, 4, 6);
+      t1 = tuples.get(1);
+      maps1 = t1.getMaps("group");
+      assertMaps(maps1, 3, 5);
 
+      t2 = tuples.get(2);
+      maps2 = t2.getMaps("group");
+      assertMaps(maps2, 4, 6);
+    } finally {
+      solrClientCache.close();
+    }
   }
 
 
@@ -1158,66 +1297,76 @@ public class StreamExpressionTest extends SolrCloudTestCase {
         .add(id, "9", "a_s", "hello0", "a_i", "9", "a_f", "10", "subject", "blah blah blah 9")
         .commit(cluster.getSolrClient(), COLLECTIONORALIAS);
 
+    StreamContext streamContext = new StreamContext();
+    SolrClientCache solrClientCache = new SolrClientCache();
+    streamContext.setSolrClientCache(solrClientCache);
+
     TupleStream stream;
     List<Tuple> tuples;
 
     StreamFactory factory = new StreamFactory()
         .withCollectionZkHost(COLLECTIONORALIAS, cluster.getZkServer().getZkAddress())
         .withFunctionName("search", CloudSolrStream.class)
-        .withFunctionName("parallel", ParallelStream.class)
-        .withFunctionName("fetch", FetchStream.class);
-
-    stream = factory.constructStream("parallel(" + COLLECTIONORALIAS + ", workers=2, sort=\"a_f asc\", fetch(" + COLLECTIONORALIAS + ",  search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc\", partitionKeys=\"id\"), on=\"id=a_i\", batchSize=\"2\", fl=\"subject\"))");
-    tuples = getTuples(stream);
-
-    assert(tuples.size() == 10);
-    Tuple t = tuples.get(0);
-    assertTrue("blah blah blah 0".equals(t.getString("subject")));
-    t = tuples.get(1);
-    assertTrue("blah blah blah 2".equals(t.getString("subject")));
-    t = tuples.get(2);
-    assertTrue("blah blah blah 3".equals(t.getString("subject")));
-    t = tuples.get(3);
-    assertTrue("blah blah blah 4".equals(t.getString("subject")));
-    t = tuples.get(4);
-    assertTrue("blah blah blah 1".equals(t.getString("subject")));
-    t = tuples.get(5);
-    assertTrue("blah blah blah 5".equals(t.getString("subject")));
-    t = tuples.get(6);
-    assertTrue("blah blah blah 6".equals(t.getString("subject")));
-    t = tuples.get(7);
-    assertTrue("blah blah blah 7".equals(t.getString("subject")));
-    t = tuples.get(8);
-    assertTrue("blah blah blah 8".equals(t.getString("subject")));
-    t = tuples.get(9);
-    assertTrue("blah blah blah 9".equals(t.getString("subject")));
-
-
-    stream = factory.constructStream("parallel(" + COLLECTIONORALIAS + ", workers=2, sort=\"a_f asc\", fetch(" + COLLECTIONORALIAS + ",  search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc\", partitionKeys=\"id\"), on=\"id=a_i\", batchSize=\"3\", fl=\"subject\"))");
-    tuples = getTuples(stream);
-
-    assert(tuples.size() == 10);
-    t = tuples.get(0);
-    assertTrue("blah blah blah 0".equals(t.getString("subject")));
-    t = tuples.get(1);
-    assertTrue("blah blah blah 2".equals(t.getString("subject")));
-    t = tuples.get(2);
-    assertTrue("blah blah blah 3".equals(t.getString("subject")));
-    t = tuples.get(3);
-    assertTrue("blah blah blah 4".equals(t.getString("subject")));
-    t = tuples.get(4);
-    assertTrue("blah blah blah 1".equals(t.getString("subject")));
-    t = tuples.get(5);
-    assertTrue("blah blah blah 5".equals(t.getString("subject")));
-    t = tuples.get(6);
-    assertTrue("blah blah blah 6".equals(t.getString("subject")));
-    t = tuples.get(7);
-    assertTrue("blah blah blah 7".equals(t.getString("subject")));
-    t = tuples.get(8);
-    assertTrue("blah blah blah 8".equals(t.getString("subject")));
-    t = tuples.get(9);
-    assertTrue("blah blah blah 9".equals(t.getString("subject")));
+        .withFunctionName("parallel", ParallelStream.class)
+        .withFunctionName("fetch", FetchStream.class);
+
+    try {
+
+      stream = factory.constructStream("parallel(" + COLLECTIONORALIAS + ", workers=2, sort=\"a_f asc\", fetch(" + COLLECTIONORALIAS + ",  search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc\", partitionKeys=\"id\"), on=\"id=a_i\", batchSize=\"2\", fl=\"subject\"))");
+      stream.setStreamContext(streamContext);
+      tuples = getTuples(stream);
+
+      assert (tuples.size() == 10);
+      Tuple t = tuples.get(0);
+      assertTrue("blah blah blah 0".equals(t.getString("subject")));
+      t = tuples.get(1);
+      assertTrue("blah blah blah 2".equals(t.getString("subject")));
+      t = tuples.get(2);
+      assertTrue("blah blah blah 3".equals(t.getString("subject")));
+      t = tuples.get(3);
+      assertTrue("blah blah blah 4".equals(t.getString("subject")));
+      t = tuples.get(4);
+      assertTrue("blah blah blah 1".equals(t.getString("subject")));
+      t = tuples.get(5);
+      assertTrue("blah blah blah 5".equals(t.getString("subject")));
+      t = tuples.get(6);
+      assertTrue("blah blah blah 6".equals(t.getString("subject")));
+      t = tuples.get(7);
+      assertTrue("blah blah blah 7".equals(t.getString("subject")));
+      t = tuples.get(8);
+      assertTrue("blah blah blah 8".equals(t.getString("subject")));
+      t = tuples.get(9);
+      assertTrue("blah blah blah 9".equals(t.getString("subject")));
+
+
+      stream = factory.constructStream("parallel(" + COLLECTIONORALIAS + ", workers=2, sort=\"a_f asc\", fetch(" + COLLECTIONORALIAS + ",  search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc\", partitionKeys=\"id\"), on=\"id=a_i\", batchSize=\"3\", fl=\"subject\"))");
+      stream.setStreamContext(streamContext);
+      tuples = getTuples(stream);
 
+      assert (tuples.size() == 10);
+      t = tuples.get(0);
+      assertTrue("blah blah blah 0".equals(t.getString("subject")));
+      t = tuples.get(1);
+      assertTrue("blah blah blah 2".equals(t.getString("subject")));
+      t = tuples.get(2);
+      assertTrue("blah blah blah 3".equals(t.getString("subject")));
+      t = tuples.get(3);
+      assertTrue("blah blah blah 4".equals(t.getString("subject")));
+      t = tuples.get(4);
+      assertTrue("blah blah blah 1".equals(t.getString("subject")));
+      t = tuples.get(5);
+      assertTrue("blah blah blah 5".equals(t.getString("subject")));
+      t = tuples.get(6);
+      assertTrue("blah blah blah 6".equals(t.getString("subject")));
+      t = tuples.get(7);
+      assertTrue("blah blah blah 7".equals(t.getString("subject")));
+      t = tuples.get(8);
+      assertTrue("blah blah blah 8".equals(t.getString("subject")));
+      t = tuples.get(9);
+      assertTrue("blah blah blah 9".equals(t.getString("subject")));
+    } finally {
+      solrClientCache.close();
+    }
   }
 
 
@@ -1260,87 +1409,91 @@ public class StreamExpressionTest extends SolrCloudTestCase {
         + "sum(a_i)"
         + "), id=\"test\", runInterval=\"1000\", queueSize=\"9\")");
     daemonStream = (DaemonStream)factory.constructStream(expression);
+    StreamContext streamContext = new StreamContext();
+    SolrClientCache solrClientCache = new SolrClientCache();
+    streamContext.setSolrClientCache(solrClientCache);
+    daemonStream.setStreamContext(streamContext);
+    try {
+      //Test Long and Double Sums
 
+      daemonStream.open(); // This will start the daemon thread
 
-    //Test Long and Double Sums
-
-    daemonStream.open(); // This will start the daemon thread
-
-    for(int i=0; i<4; i++) {
-      Tuple tuple = daemonStream.read(); // Reads from the queue
-      String bucket = tuple.getString("a_s");
-      Double sumi = tuple.getDouble("sum(a_i)");
-
-      //System.out.println("#################################### Bucket 1:"+bucket);
-      assertTrue(bucket.equals("hello0"));
-      assertTrue(sumi.doubleValue() == 17.0D);
-
-      tuple = daemonStream.read();
-      bucket = tuple.getString("a_s");
-      sumi = tuple.getDouble("sum(a_i)");
-
-      //System.out.println("#################################### Bucket 2:"+bucket);
-      assertTrue(bucket.equals("hello3"));
-      assertTrue(sumi.doubleValue() == 38.0D);
+      for (int i = 0; i < 4; i++) {
+        Tuple tuple = daemonStream.read(); // Reads from the queue
+        String bucket = tuple.getString("a_s");
+        Double sumi = tuple.getDouble("sum(a_i)");
 
-      tuple = daemonStream.read();
-      bucket = tuple.getString("a_s");
-      sumi = tuple.getDouble("sum(a_i)");
-      //System.out.println("#################################### Bucket 3:"+bucket);
-      assertTrue(bucket.equals("hello4"));
-      assertTrue(sumi.longValue() == 15);
-    }
+        //System.out.println("#################################### Bucket 1:"+bucket);
+        assertTrue(bucket.equals("hello0"));
+        assertTrue(sumi.doubleValue() == 17.0D);
 
-    //Now lets wait until the internal queue fills up
+        tuple = daemonStream.read();
+        bucket = tuple.getString("a_s");
+        sumi = tuple.getDouble("sum(a_i)");
 
-    while(daemonStream.remainingCapacity() > 0) {
-      try {
-        Thread.sleep(1000);
-      } catch (Exception e) {
+        //System.out.println("#################################### Bucket 2:"+bucket);
+        assertTrue(bucket.equals("hello3"));
+        assertTrue(sumi.doubleValue() == 38.0D);
 
+        tuple = daemonStream.read();
+        bucket = tuple.getString("a_s");
+        sumi = tuple.getDouble("sum(a_i)");
+        //System.out.println("#################################### Bucket 3:"+bucket);
+        assertTrue(bucket.equals("hello4"));
+        assertTrue(sumi.longValue() == 15);
       }
-    }
-
-    //OK capacity is full, let's index a new doc
 
-    new UpdateRequest()
-        .add(id, "10", "a_s", "hello0", "a_i", "1", "a_f", "10")
-        .commit(cluster.getSolrClient(), COLLECTIONORALIAS);
+      //Now lets wait until the internal queue fills up
 
-    //Now lets clear the existing docs in the queue 9, plus 3 more to get passed the run that was blocked. The next run should
-    //have the tuples with the updated count.
-    for(int i=0; i<12;i++) {
-      daemonStream.read();
-    }
+      while (daemonStream.remainingCapacity() > 0) {
+        try {
+          Thread.sleep(1000);
+        } catch (Exception e) {
 
-    //And rerun the loop. It should have a new count for hello0
-    for(int i=0; i<4; i++) {
-      Tuple tuple = daemonStream.read(); // Reads from the queue
-      String bucket = tuple.getString("a_s");
-      Double sumi = tuple.getDouble("sum(a_i)");
+        }
+      }
 
-      //System.out.println("#################################### Bucket 1:"+bucket);
-      assertTrue(bucket.equals("hello0"));
-      assertTrue(sumi.doubleValue() == 18.0D);
+      //OK capacity is full, let's index a new doc
 
-      tuple = daemonStream.read();
-      bucket = tuple.getString("a_s");
-      sumi = tuple.getDouble("sum(a_i)");
+      new UpdateRequest()
+          .add(id, "10", "a_s", "hello0", "a_i", "1", "a_f", "10")
+          .commit(cluster.getSolrClient(), COLLECTIONORALIAS);
 
-      //System.out.println("#################################### Bucket 2:"+bucket);
-      assertTrue(bucket.equals("hello3"));
-      assertTrue(sumi.doubleValue() == 38.0D);
+      //Now lets clear the existing docs in the queue 9, plus 3 more to get passed the run that was blocked. The next run should
+      //have the tuples with the updated count.
+      for (int i = 0; i < 12; i++) {
+        daemonStream.read();
+      }
 
-      tuple = daemonStream.read();
-      bucket = tuple.getString("a_s");
-      sumi = tuple.getDouble("sum(a_i)");
-      //System.out.println("#################################### Bucket 3:"+bucket);
-      assertTrue(bucket.equals("hello4"));
-      assertTrue(sumi.longValue() == 15);
+      //And rerun the loop. It should have a new count for hello0
+      for (int i = 0; i < 4; i++) {
+        Tuple tuple = daemonStream.read(); // Reads from the queue
+        String bucket = tuple.getString("a_s");
+        Double sumi = tuple.getDouble("sum(a_i)");
+
+        //System.out.println("#################################### Bucket 1:"+bucket);
+        assertTrue(bucket.equals("hello0"));
+        assertTrue(sumi.doubleValue() == 18.0D);
+
+        tuple = daemonStream.read();
+        bucket = tuple.getString("a_s");
+        sumi = tuple.getDouble("sum(a_i)");
+
+        //System.out.println("#################################### Bucket 2:"+bucket);
+        assertTrue(bucket.equals("hello3"));
+        assertTrue(sumi.doubleValue() == 38.0D);
+
+        tuple = daemonStream.read();
+        bucket = tuple.getString("a_s");
+        sumi = tuple.getDouble("sum(a_i)");
+        //System.out.println("#################################### Bucket 3:"+bucket);
+        assertTrue(bucket.equals("hello4"));
+        assertTrue(sumi.longValue() == 15);
+      }
+    } finally {
+      daemonStream.close(); //This should stop the daemon thread
+      solrClientCache.close();
     }
-
-    daemonStream.close(); //This should stop the daemon thread
-
   }
 
 
@@ -1412,96 +1565,103 @@ public class StreamExpressionTest extends SolrCloudTestCase {
     StreamExpression expression;
     TupleStream stream;
     List<Tuple> tuples;
+    StreamContext streamContext = new StreamContext();
+    SolrClientCache solrClientCache = new SolrClientCache();
+    streamContext.setSolrClientCache(solrClientCache);
+    try {
+      expression = StreamExpressionParser.parse("rollup("
+          + "search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"a_s,a_i,a_f\", sort=\"a_s asc\"),"
+          + "over=\"a_s\","
+          + "sum(a_i),"
+          + "sum(a_f),"
+          + "min(a_i),"
+          + "min(a_f),"
+          + "max(a_i),"
+          + "max(a_f),"
+          + "avg(a_i),"
+          + "avg(a_f),"
+          + "count(*),"
+          + ")");
+      stream = factory.constructStream(expression);
+      stream.setStreamContext(streamContext);
+      tuples = getTuples(stream);
 
-    expression = StreamExpressionParser.parse("rollup("
-                                              + "search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"a_s,a_i,a_f\", sort=\"a_s asc\"),"
-                                              + "over=\"a_s\","
-                                              + "sum(a_i),"
-                                              + "sum(a_f),"
-                                              + "min(a_i),"
-                                              + "min(a_f),"
-                                              + "max(a_i),"
-                                              + "max(a_f),"
-                                              + "avg(a_i),"
-                                              + "avg(a_f),"
-                                              + "count(*),"
-                                              + ")");
-    stream = factory.constructStream(expression);
-    tuples = getTuples(stream);
-
-    assert(tuples.size() == 3);
-
-    //Test Long and Double Sums
-
-    Tuple tuple = tuples.get(0);
-    String bucket = tuple.getString("a_s");
-    Double sumi = tuple.getDouble("sum(a_i)");
-    Double sumf = tuple.getDouble("sum(a_f)");
-    Double mini = tuple.getDouble("min(a_i)");
-    Double minf = tuple.getDouble("min(a_f)");
-    Double maxi = tuple.getDouble("max(a_i)");
-    Double maxf = tuple.getDouble("max(a_f)");
-    Double avgi = tuple.getDouble("avg(a_i)");
-    Double avgf = tuple.getDouble("avg(a_f)");
-    Double count = tuple.getDouble("count(*)");
+      assert (tuples.size() == 3);
 
-    assertTrue(bucket.equals("hello0"));
-    assertTrue(sumi.doubleValue() == 17.0D);
-    assertTrue(sumf.doubleValue() == 18.0D);
-    assertTrue(mini.doubleValue() == 0.0D);
-    assertTrue(minf.doubleValue() == 1.0D);
-    assertTrue(maxi.doubleValue() == 14.0D);
-    assertTrue(maxf.doubleValue() == 10.0D);
-    assertTrue(avgi.doubleValue() == 4.25D);
-    assertTrue(avgf.doubleValue() == 4.5D);
-    assertTrue(count.doubleValue() == 4);
+      //Test Long and Double Sums
 
-    tuple = tuples.get(1);
-    bucket = tuple.getString("a_s");
-    sumi = tuple.getDouble("sum(a_i)");
-    sumf = tuple.getDouble("sum(a_f)");
-    mini = tuple.getDouble("min(a_i)");
-    minf = tuple.getDouble("min(a_f)");
-    maxi = tuple.getDouble("max(a_i)");
-    maxf = tuple.getDouble("max(a_f)");
-    avgi = tuple.getDouble("avg(a_i)");
-    avgf = tuple.getDouble("avg(a_f)");
-    count = tuple.getDouble("count(*)");
+      Tuple tuple = tuples.get(0);
+      String bucket = tuple.getString("a_s");
+      Double sumi = tuple.getDouble("sum(a_i)");
+      Double sumf = tuple.getDouble("sum(a_f)");
+      Double mini = tuple.getDouble("min(a_i)");
+      Double minf = tuple.getDouble("min(a_f)");
+      Double maxi = tuple.getDouble("max(a_i)");
+      Double maxf = tuple.getDouble("max(a_f)");
+      Double avgi = tuple.getDouble("avg(a_i)");
+      Double avgf = tuple.getDouble("avg(a_f)");
+      Double count = tuple.getDouble("count(*)");
 
-    assertTrue(bucket.equals("hello3"));
-    assertTrue(sumi.doubleValue() == 38.0D);
-    assertTrue(sumf.doubleValue() == 26.0D);
-    assertTrue(mini.doubleValue() == 3.0D);
-    assertTrue(minf.doubleValue() == 3.0D);
-    assertTrue(maxi.doubleValue() == 13.0D);
-    assertTrue(maxf.doubleValue() == 9.0D);
-    assertTrue(avgi.doubleValue() == 9.5D);
-    assertTrue(avgf.doubleValue() == 6.5D);
-    assertTrue(count.doubleValue() == 4);
+      assertTrue(bucket.equals("hello0"));
+      assertTrue(sumi.doubleValue() == 17.0D);
+      assertTrue(sumf.doubleValue() == 18.0D);
+      assertTrue(mini.doubleValue() == 0.0D);
+      assertTrue(minf.doubleValue() == 1.0D);
+      assertTrue(maxi.doubleValue() == 14.0D);
+      assertTrue(maxf.doubleValue() == 10.0D);
+      assertTrue(avgi.doubleValue() == 4.25D);
+      assertTrue(avgf.doubleValue() == 4.5D);
+      assertTrue(count.doubleValue() == 4);
+
+      tuple = tuples.get(1);
+      bucket = tuple.getString("a_s");
+      sumi = tuple.getDouble("sum(a_i)");
+      sumf = tuple.getDouble("sum(a_f)");
+      mini = tuple.getDouble("min(a_i)");
+      minf = tuple.getDouble("min(a_f)");
+      maxi = tuple.getDouble("max(a_i)");
+      maxf = tuple.getDouble("max(a_f)");
+      avgi = tuple.getDouble("avg(a_i)");
+      avgf = tuple.getDouble("avg(a_f)");
+      count = tuple.getDouble("count(*)");
 
-    tuple = tuples.get(2);
-    bucket = tuple.getString("a_s");
-    sumi = tuple.getDouble("sum(a_i)");
-    sumf = tuple.getDouble("sum(a_f)");
-    mini = tuple.getDouble("min(a_i)");
-    minf = tuple.getDouble("min(a_f)");
-    maxi = tuple.getDouble("max(a_i)");
-    maxf = tuple.getDouble("max(a_f)");
-    avgi = tuple.getDouble("avg(a_i)");
-    avgf = tuple.getDouble("avg(a_f)");
-    count = tuple.getDouble("count(*)");
+      assertTrue(bucket.equals("hello3"));
+      assertTrue(sumi.doubleValue() == 38.0D);
+      assertTrue(sumf.doubleValue() == 26.0D);
+      assertTrue(mini.doubleValue() == 3.0D);
+      assertTrue(minf.doubleValue() == 3.0D);
+      assertTrue(maxi.doubleValue() == 13.0D);
+      assertTrue(maxf.doubleValue() == 9.0D);
+      assertTrue(avgi.doubleValue() == 9.5D);
+      assertTrue(avgf.doubleValue() == 6.5D);
+      assertTrue(count.doubleValue() == 4);
+
+      tuple = tuples.get(2);
+      bucket = tuple.getString("a_s");
+      sumi = tuple.getDouble("sum(a_i)");
+      sumf = tuple.getDouble("sum(a_f)");
+      mini = tuple.getDouble("min(a_i)");
+      minf = tuple.getDouble("min(a_f)");
+      maxi = tuple.getDouble("max(a_i)");
+      maxf = tuple.getDouble("max(a_f)");
+      avgi = tuple.getDouble("avg(a_i)");
+      avgf = tuple.getDouble("avg(a_f)");
+      count = tuple.getDouble("count(*)");
 
-    assertTrue(bucket.equals("hello4"));
-    assertTrue(sumi.longValue() == 15);
-    assertTrue(sumf.doubleValue() == 11.0D);
-    assertTrue(mini.doubleValue() == 4.0D);
-    assertTrue(minf.doubleValue() == 4.0D);
-    assertTrue(maxi.doubleValue() == 11.0D);
-    assertTrue(maxf.doubleValue() == 7.0D);
-    assertTrue(avgi.doubleValue() == 7.5D);
-    assertTrue(avgf.doubleValue() == 5.5D);
-    assertTrue(count.doubleValue() == 2);
+      assertTrue(bucket.equals("hello4"));
+      assertTrue(sumi.longValue() == 15);
+      assertTrue(sumf.doubleValue() == 11.0D);
+      assertTrue(mini.doubleValue() == 4.0D);
+      assertTrue(minf.doubleValue() == 4.0D);
+      assertTrue(maxi.doubleValue() == 11.0D);
+      assertTrue(maxf.doubleValue() == 7.0D);
+      assertTrue(avgi.doubleValue() == 7.5D);
+      assertTrue(avgf.doubleValue() == 5.5D);
+      assertTrue(count.doubleValue() == 2);
 
+    } finally {
+      solrClientCache.close();
+    }
   }
 
   @Test
@@ -1588,18 +1748,27 @@ public class StreamExpressionTest extends SolrCloudTestCase {
         .withFunctionName("top", RankStream.class)
         .withFunctionName("group", ReducerStream.class)
         .withFunctionName("parallel", ParallelStream.class);
+    StreamContext streamContext = new StreamContext();
+    SolrClientCache solrClientCache = new SolrClientCache();
+    streamContext.setSolrClientCache(solrClientCache);
+
 
-    ParallelStream pstream = (ParallelStream)streamFactory.constructStream("parallel(" + COLLECTIONORALIAS + ", unique(search(collection1, q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_i asc\", partitionKeys=\"a_f\"), over=\"a_f\"), workers=\"2\", zkHost=\""+zkHost+"\", sort=\"a_f asc\")");
 
-    List<Tuple> tuples = getTuples(pstream);
-    assert(tuples.size() == 5);
-    assertOrder(tuples, 0, 1, 3, 4, 6);
+    try {
 
-    //Test the eofTuples
+      ParallelStream pstream = (ParallelStream) streamFactory.constructStream("parallel(" + COLLECTIONORALIAS + ", unique(search(collection1, q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_i asc\", partitionKeys=\"a_f\"), over=\"a_f\"), workers=\"2\", zkHost=\"" + zkHost + "\", sort=\"a_f asc\")");
+      pstream.setStreamContext(streamContext);
+      List<Tuple> tuples = getTuples(pstream);
+      assert (tuples.size() == 5);
+      assertOrder(tuples, 0, 1, 3, 4, 6);
 
-    Map<String,Tuple> eofTuples = pstream.getEofTuples();
-    assert(eofTuples.size() == 2); //There should be an EOF tuple for each worker.
+      //Test the eofTuples
 
+      Map<String, Tuple> eofTuples = pstream.getEofTuples();
+      assert (eofTuples.size() == 2); //There should be an EOF tuple for each worker.
+    } finally {
+      solrClientCache.close();
+    }
   }
 
   @Test
@@ -1666,23 +1835,32 @@ public class StreamExpressionTest extends SolrCloudTestCase {
 
         .commit(cluster.getSolrClient(), COLLECTIONORALIAS);
 
+    StreamContext streamContext = new StreamContext();
+    SolrClientCache solrClientCache = new SolrClientCache();
+    streamContext.setSolrClientCache(solrClientCache);
+
     String zkHost = cluster.getZkServer().getZkAddress();
     StreamFactory streamFactory = new StreamFactory().withCollectionZkHost(COLLECTIONORALIAS, zkHost)
         .withFunctionName("shuffle", ShuffleStream.class)
         .withFunctionName("unique", UniqueStream.class)
         .withFunctionName("parallel", ParallelStream.class);
 
-    ParallelStream pstream = (ParallelStream)streamFactory.constructStream("parallel(" + COLLECTIONORALIAS + ", unique(shuffle(collection1, q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_i asc\", partitionKeys=\"a_f\"), over=\"a_f\"), workers=\"2\", zkHost=\""+zkHost+"\", sort=\"a_f asc\")");
-
-    List<Tuple> tuples = getTuples(pstream);
-    assert(tuples.size() == 6);
-    assertOrder(tuples, 0, 1, 3, 4, 6, 56);
-
-    //Test the eofTuples
-
-    Map<String,Tuple> eofTuples = pstream.getEofTuples();
-    assert(eofTuples.size() == 2); //There should be an EOF tuple for each worker.
-    assert(pstream.toExpression(streamFactory).toString().contains("shuffle"));
+    try {
+      ParallelStream pstream = (ParallelStream) streamFactory.constructStream("parallel(" + COLLECTIONORALIAS + ", unique(shuffle(collection1, q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_i asc\", partitionKeys=\"a_f\"), over=\"a_f\"), workers=\"2\", zkHost=\"" + zkHost + "\", sort=\"a_f asc\")");
+      pstream.setStreamFactory(streamFactory);
+      pstream.setStreamContext(streamContext);
+      List<Tuple> tuples = getTuples(pstream);
+      assert (tuples.size() == 6);
+      assertOrder(tuples, 0, 1, 3, 4, 6, 56);
+
+      //Test the eofTuples
+
+      Map<String, Tuple> eofTuples = pstream.getEofTuples();
+      assert (eofTuples.size() == 2); //There should be an EOF tuple for each worker.
+      assert (pstream.toExpression(streamFactory).toString().contains("shuffle"));
+    } finally {
+      solrClientCache.close();
+    }
   }
 
 
@@ -1702,6 +1880,11 @@ public class StreamExpressionTest extends SolrCloudTestCase {
         .add(id, "9", "a_s", "hello0", "a_i", "14", "a_f", "10")
         .commit(cluster.getSolrClient(), COLLECTIONORALIAS);
 
+    StreamContext streamContext = new StreamContext();
+    SolrClientCache solrClientCache = new SolrClientCache();
+    streamContext.setSolrClientCache(solrClientCache);
+
+
     String zkHost = cluster.getZkServer().getZkAddress();
     StreamFactory streamFactory = new StreamFactory().withCollectionZkHost(COLLECTIONORALIAS, zkHost)
         .withFunctionName("search", CloudSolrStream.class)
@@ -1709,54 +1892,62 @@ public class StreamExpressionTest extends SolrCloudTestCase {
         .withFunctionName("reduce", ReducerStream.class)
         .withFunctionName("parallel", ParallelStream.class);
 
-    ParallelStream pstream = (ParallelStream)streamFactory.constructStream("parallel(" + COLLECTIONORALIAS + ", " +
-                                                                                    "reduce(" +
-                                                                                              "search(" + COLLECTIONORALIAS + ", q=\"*:*\", fl=\"id,a_s,a_i,a_f\", sort=\"a_s asc,a_f asc\", partitionKeys=\"a_s\"), " +
-                                                                                              "by=\"a_s\"," +
-                                                                                              "group(sort=\"a_i asc\", n=\"5\")), " +
-                                                                                    "workers=\"2\", zkHost=\""+zkHost+"\", sort=\"a_s asc\")");
 
-    List<Tuple> tuples = getTuples(pstream);
+    try {
+      ParallelStream pstream = (ParallelStream) streamFactory.constructStream("parallel(" + COLLECTIONORALIAS + ", " +
+          "reduce(" +
+          "search(" + COLLECTIONORALIAS + ", q=\"*:*\", fl=\"id,a_s,a_i,a_f\", sort=\"a_s asc,a_f asc\", partitionKeys=\"a_s\"), " +
+          "by=\"a_s\"," +
+          "group(sort=\"a_i asc\", n=\"5\")), " +
+          "workers=\"2\", zkHost=\"" + zkHost + "\", sort=\"a_s asc\")");
+
+      pstream.setStreamContext(streamContext);
 
-    assert(tuples.size() == 3);
+      List<Tuple> tuples = getTuples(pstream);
 
-    Tuple t0 = tuples.get(0);
-    List<Map> maps0 = t0.getMaps("group");
-    assertMaps(maps0, 0, 1, 2, 9);
+      assert (tuples.size() == 3);
 
-    Tuple t1 = tuples.get(1);
-    List<Map> maps1 = t1.getMaps("group");
-    assertMaps(maps1, 3, 5, 7, 8);
+      Tuple t0 = tuples.get(0);
+      List<Map> maps0 = t0.getMaps("group");
+      assertMaps(maps0, 0, 1, 2, 9);
 
-    Tuple t2 = tuples.get(2);
-    List<Map> maps2 = t2.getMaps("group");
-    assertMaps(maps2, 4, 6);
+      Tuple t1 = tuples.get(1);
+      List<Map> maps1 = t1.getMaps("group");
+      assertMaps(maps1, 3, 5, 7, 8);
 
+      Tuple t2 = tuples.get(2);
+      List<Map> maps2 = t2.getMaps("group");
+      assertMaps(maps2, 4, 6);
 
-    pstream = (ParallelStream)streamFactory.constructStream("parallel(" + COLLECTIONORALIAS + ", " +
-                                                                      "reduce(" +
-                                                                              "search(" + COLLECTIONORALIAS + ", q=\"*:*\", fl=\"id,a_s,a_i,a_f\", sort=\"a_s desc,a_f asc\", partitionKeys=\"a_s\"), " +
-                                                                              "by=\"a_s\", " +
-                                                                              "group(sort=\"a_i desc\", n=\"5\")),"+
-                                                                      "workers=\"2\", zkHost=\""+zkHost+"\", sort=\"a_s desc\")");
 
-    tuples = getTuples(pstream);
+      pstream = (ParallelStream) streamFactory.constructStream("parallel(" + COLLECTIONORALIAS + ", " +
+          "reduce(" +
+          "search(" + COLLECTIONORALIAS + ", q=\"*:*\", fl=\"id,a_s,a_i,a_f\", sort=\"a_s desc,a_f asc\", partitionKeys=\"a_s\"), " +
+          "by=\"a_s\", " +
+          "group(sort=\"a_i desc\", n=\"5\"))," +
+          "workers=\"2\", zkHost=\"" + zkHost + "\", sort=\"a_s desc\")");
 
-    assert(tuples.size() == 3);
+      pstream.setStreamContext(streamContext);
+      tuples = getTuples(pstream);
 
-    t0 = tuples.get(0);
-    maps0 = t0.getMaps("group");
-    assertMaps(maps0, 6, 4);
+      assert (tuples.size() == 3);
+
+      t0 = tuples.get(0);
+      maps0 = t0.getMaps("group");
+      assertMaps(maps0, 6, 4);
 
 
-    t1 = tuples.get(1);
-    maps1 = t1.getMaps("group");
-    assertMaps(maps1, 8, 7, 5, 3);
+      t1 = tuples.get(1);
+      maps1 = t1.getMaps("group");
+      assertMaps(maps1, 8, 7, 5, 3);
 
 
-    t2 = tuples.get(2);
-    maps2 = t2.getMaps("group");
-    assertMaps(maps2, 9, 2, 1, 0);
+      t2 = tuples.get(2);
+      maps2 = t2.getMaps("group");
+      assertMaps(maps2, 9, 2, 1, 0);
+    } finally {
+      solrClientCache.close();
+    }
 
   }
 
@@ -1784,17 +1975,24 @@ public class StreamExpressionTest extends SolrCloudTestCase {
         .withFunctionName("group", ReducerStream.class)
         .withFunctionName("parallel", ParallelStream.class);
 
-    ParallelStream pstream = (ParallelStream)streamFactory.constructStream("parallel("
-        + COLLECTIONORALIAS + ", "
-        + "top("
+    StreamContext streamContext = new StreamContext();
+    SolrClientCache solrClientCache = new SolrClientCache();
+    streamContext.setSolrClientCache(solrClientCache);
+    try {
+      ParallelStream pstream = (ParallelStream) streamFactory.constructStream("parallel("
+          + COLLECTIONORALIAS + ", "
+          + "top("
           + "search(" + COLLECTIONORALIAS + ", q=\"*:*\", fl=\"id,a_s,a_i\", sort=\"a_i asc\", partitionKeys=\"a_i\"), "
           + "n=\"11\", "
-          + "sort=\"a_i desc\"), workers=\"2\", zkHost=\""+zkHost+"\", sort=\"a_i desc\")");
-
-    List<Tuple> tuples = getTuples(pstream);
+          + "sort=\"a_i desc\"), workers=\"2\", zkHost=\"" + zkHost + "\", sort=\"a_i desc\")");
+      pstream.setStreamContext(streamContext);
+      List<Tuple> tuples = getTuples(pstream);
 
-    assert(tuples.size() == 10);
-    assertOrder(tuples, 10,9,8,7,6,5,4,3,2,0);
+      assert (tuples.size() == 10);
+      assertOrder(tuples, 10, 9, 8, 7, 6, 5, 4, 3, 2, 0);
+    } finally {
+      solrClientCache.close();
+    }
 
   }
 
@@ -1823,24 +2021,29 @@ public class StreamExpressionTest extends SolrCloudTestCase {
         .withFunctionName("merge", MergeStream.class)
         .withFunctionName("parallel", ParallelStream.class);
 
-    //Test ascending
-    ParallelStream pstream = (ParallelStream)streamFactory.constructStream("parallel(" + COLLECTIONORALIAS + ", merge(search(" + COLLECTIONORALIAS + ", q=\"id:(4 1 8 7 9)\", fl=\"id,a_s,a_i\", sort=\"a_i asc\", partitionKeys=\"a_i\"), search(" + COLLECTIONORALIAS + ", q=\"id:(0 2 3 6)\", fl=\"id,a_s,a_i\", sort=\"a_i asc\", partitionKeys=\"a_i\"), on=\"a_i asc\"), workers=\"2\", zkHost=\""+zkHost+"\", sort=\"a_i asc\")");
-
-    List<Tuple> tuples = getTuples(pstream);
-
-
-
-    assert(tuples.size() == 9);
-    assertOrder(tuples, 0, 1, 2, 3, 4, 7, 6, 8, 9);
+    StreamContext streamContext = new StreamContext();
+    SolrClientCache solrClientCache = new SolrClientCache();
+    streamContext.setSolrClientCache(solrClientCache);
+    try {
+      //Test ascending
+      ParallelStream pstream = (ParallelStream) streamFactory.constructStream("parallel(" + COLLECTIONORALIAS + ", merge(search(" + COLLECTIONORALIAS + ", q=\"id:(4 1 8 7 9)\", fl=\"id,a_s,a_i\", sort=\"a_i asc\", partitionKeys=\"a_i\"), search(" + COLLECTIONORALIAS + ", q=\"id:(0 2 3 6)\", fl=\"id,a_s,a_i\", sort=\"a_i asc\", partitionKeys=\"a_i\"), on=\"a_i asc\"), workers=\"2\", zkHost=\"" + zkHost + "\", sort=\"a_i asc\")");
+      pstream.setStreamContext(streamContext);
+      List<Tuple> tuples = getTuples(pstream);
 
-    //Test descending
+      assert (tuples.size() == 9);
+      assertOrder(tuples, 0, 1, 2, 3, 4, 7, 6, 8, 9);
 
-    pstream = (ParallelStream)streamFactory.constructStream("parallel(" + COLLECTIONORALIAS + ", merge(search(" + COLLECTIONORALIAS + ", q=\"id:(4 1 8 9)\", fl=\"id,a_s,a_i\", sort=\"a_i desc\", partitionKeys=\"a_i\"), search(" + COLLECTIONORALIAS + ", q=\"id:(0 2 3 6)\", fl=\"id,a_s,a_i\", sort=\"a_i desc\", partitionKeys=\"a_i\"), on=\"a_i desc\"), workers=\"2\", zkHost=\""+zkHost+"\", sort=\"a_i desc\")");
+      //Test descending
 
-    tuples = getTuples(pstream);
+      pstream = (ParallelStream) streamFactory.constructStream("parallel(" + COLLECTIONORALIAS + ", merge(search(" + COLLECTIONORALIAS + ", q=\"id:(4 1 8 9)\", fl=\"id,a_s,a_i\", sort=\"a_i desc\", partitionKeys=\"a_i\"), search(" + COLLECTIONORALIAS + ", q=\"id:(0 2 3 6)\", fl=\"id,a_s,a_i\", sort=\"a_i desc\", partitionKeys=\"a_i\"), on=\"a_i desc\"), workers=\"2\", zkHost=\"" + zkHost + "\", sort=\"a_i desc\")");
+      pstream.setStreamContext(streamContext);
+      tuples = getTuples(pstream);
 
-    assert(tuples.size() == 8);
-    assertOrder(tuples, 9, 8, 6, 4, 3, 2, 1, 0);
+      assert (tuples.size() == 8);
+      assertOrder(tuples, 9, 8, 6, 4, 3, 2, 1, 0);
+    } finally {
+      solrClientCache.close();
+    }
 
   }
 
@@ -1869,104 +2072,115 @@ public class StreamExpressionTest extends SolrCloudTestCase {
       .withFunctionName("min", MinMetric.class)
       .withFunctionName("max", MaxMetric.class)
       .withFunctionName("avg", MeanMetric.class)
-      .withFunctionName("count", CountMetric.class);     
-    
-    StreamExpression expression;
-    TupleStream stream;
-    List<Tuple> tuples;
+      .withFunctionName("count", CountMetric.class);
 
-    expression = StreamExpressionParser.parse("parallel(" + COLLECTIONORALIAS + ","
-                                              + "rollup("
-                                                + "search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"a_s,a_i,a_f\", sort=\"a_s asc\", partitionKeys=\"a_s\"),"
-                                                + "over=\"a_s\","
-                                                + "sum(a_i),"
-                                                + "sum(a_f),"
-                                                + "min(a_i),"
-                                                + "min(a_f),"
-                                                + "max(a_i),"
-                                                + "max(a_f),"
-                                                + "avg(a_i),"
-                                                + "avg(a_f),"
-                                                + "count(*)"
-                                              + "),"
-                                              + "workers=\"2\", zkHost=\""+cluster.getZkServer().getZkAddress()+"\", sort=\"a_s asc\")"
-                                              );
-    stream = factory.constructStream(expression);
-    tuples = getTuples(stream);
 
-    assert(tuples.size() == 3);
+    StreamContext streamContext = new StreamContext();
+    SolrClientCache solrClientCache = new SolrClientCache();
+    streamContext.setSolrClientCache(solrClientCache);
 
-    //Test Long and Double Sums
+    StreamExpression expression;
+    TupleStream stream;
+    List<Tuple> tuples;
 
-    Tuple tuple = tuples.get(0);
-    String bucket = tuple.getString("a_s");
-    Double sumi = tuple.getDouble("sum(a_i)");
-    Double sumf = tuple.getDouble("sum(a_f)");
-    Double mini = tuple.getDouble("min(a_i)");
-    Double minf = tuple.getDouble("min(a_f)");
-    Double maxi = tuple.getDouble("max(a_i)");
-    Double maxf = tuple.getDouble("max(a_f)");
-    Double avgi = tuple.getDouble("avg(a_i)");
-    Double avgf = tuple.getDouble("avg(a_f)");
-    Double count = tuple.getDouble("count(*)");
+    try {
+      expression = StreamExpressionParser.parse("parallel(" + COLLECTIONORALIAS + ","
+              + "rollup("
+              + "search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"a_s,a_i,a_f\", sort=\"a_s asc\", partitionKeys=\"a_s\"),"
+              + "over=\"a_s\","
+              + "sum(a_i),"
+              + "sum(a_f),"
+              + "min(a_i),"
+              + "min(a_f),"
+              + "max(a_i),"
+              + "max(a_f),"
+              + "avg(a_i),"
+              + "avg(a_f),"
+              + "count(*)"
+              + "),"
+              + "workers=\"2\", zkHost=\"" + cluster.getZkServer().getZkAddress() + "\", sort=\"a_s asc\")"
+      );
 
-    assertTrue(bucket.equals("hello0"));
-    assertTrue(sumi.doubleValue() == 17.0D);
-    assertTrue(sumf.doubleValue() == 18.0D);
-    assertTrue(mini.doubleValue() == 0.0D);
-    assertTrue(minf.doubleValue() == 1.0D);
-    assertTrue(maxi.doubleValue() == 14.0D);
-    assertTrue(maxf.doubleValue() == 10.0D);
-    assertTrue(avgi.doubleValue() == 4.25D);
-    assertTrue(avgf.doubleValue() == 4.5D);
-    assertTrue(count.doubleValue() == 4);
 
-    tuple = tuples.get(1);
-    bucket = tuple.getString("a_s");
-    sumi = tuple.getDouble("sum(a_i)");
-    sumf = tuple.getDouble("sum(a_f)");
-    mini = tuple.getDouble("min(a_i)");
-    minf = tuple.getDouble("min(a_f)");
-    maxi = tuple.getDouble("max(a_i)");
-    maxf = tuple.getDouble("max(a_f)");
-    avgi = tuple.getDouble("avg(a_i)");
-    avgf = tuple.getDouble("avg(a_f)");
-    count = tuple.getDouble("count(*)");
+      stream = factory.constructStream(expression);
+      stream.setStreamContext(streamContext);
+      tuples = getTuples(stream);
 
-    assertTrue(bucket.equals("hello3"));
-    assertTrue(sumi.doubleValue() == 38.0D);
-    assertTrue(sumf.doubleValue() == 26.0D);
-    assertTrue(mini.doubleValue() == 3.0D);
-    assertTrue(minf.doubleValue() == 3.0D);
-    assertTrue(maxi.doubleValue() == 13.0D);
-    assertTrue(maxf.doubleValue() == 9.0D);
-    assertTrue(avgi.doubleValue() == 9.5D);
-    assertTrue(avgf.doubleValue() == 6.5D);
-    assertTrue(count.doubleValue() == 4);
+      assert (tuples.size() == 3);
 
-    tuple = tuples.get(2);
-    bucket = tuple.getString("a_s");
-    sumi = tuple.getDouble("sum(a_i)");
-    sumf = tuple.getDouble("sum(a_f)");
-    mini = tuple.getDouble("min(a_i)");
-    minf = tuple.getDouble("min(a_f)");
-    maxi = tuple.getDouble("max(a_i)");
-    maxf = tuple.getDouble("max(a_f)");
-    avgi = tuple.getDouble("avg(a_i)");
-    avgf = tuple.getDouble("avg(a_f)");
-    count = tuple.getDouble("count(*)");
+      //Test Long and Double Sums
 
-    assertTrue(bucket.equals("hello4"));
-    assertTrue(sumi.longValue() == 15);
-    assertTrue(sumf.doubleValue() == 11.0D);
-    assertTrue(mini.doubleValue() == 4.0D);
-    assertTrue(minf.doubleValue() == 4.0D);
-    assertTrue(maxi.doubleValue() == 11.0D);
-    assertTrue(maxf.doubleValue() == 7.0D);
-    assertTrue(avgi.doubleValue() == 7.5D);
-    assertTrue(avgf.doubleValue() == 5.5D);
-    assertTrue(count.doubleValue() == 2);
+      Tuple tuple = tuples.get(0);
+      String bucket = tuple.getString("a_s");
+      Double sumi = tuple.getDouble("sum(a_i)");
+      Double sumf = tuple.getDouble("sum(a_f)");
+      Double mini = tuple.getDouble("min(a_i)");
+      Double minf = tuple.getDouble("min(a_f)");
+      Double maxi = tuple.getDouble("max(a_i)");
+      Double maxf = tuple.getDouble("max(a_f)");
+      Double avgi = tuple.getDouble("avg(a_i)");
+      Double avgf = tuple.getDouble("avg(a_f)");
+      Double count = tuple.getDouble("count(*)");
+
+      assertTrue(bucket.equals("hello0"));
+      assertTrue(sumi.doubleValue() == 17.0D);
+      assertTrue(sumf.doubleValue() == 18.0D);
+      assertTrue(mini.doubleValue() == 0.0D);
+      assertTrue(minf.doubleValue() == 1.0D);
+      assertTrue(maxi.doubleValue() == 14.0D);
+      assertTrue(maxf.doubleValue() == 10.0D);
+      assertTrue(avgi.doubleValue() == 4.25D);
+      assertTrue(avgf.doubleValue() == 4.5D);
+      assertTrue(count.doubleValue() == 4);
+
+      tuple = tuples.get(1);
+      bucket = tuple.getString("a_s");
+      sumi = tuple.getDouble("sum(a_i)");
+      sumf = tuple.getDouble("sum(a_f)");
+      mini = tuple.getDouble("min(a_i)");
+      minf = tuple.getDouble("min(a_f)");
+      maxi = tuple.getDouble("max(a_i)");
+      maxf = tuple.getDouble("max(a_f)");
+      avgi = tuple.getDouble("avg(a_i)");
+      avgf = tuple.getDouble("avg(a_f)");
+      count = tuple.getDouble("count(*)");
+
+      assertTrue(bucket.equals("hello3"));
+      assertTrue(sumi.doubleValue() == 38.0D);
+      assertTrue(sumf.doubleValue() == 26.0D);
+      assertTrue(mini.doubleValue() == 3.0D);
+      assertTrue(minf.doubleValue() == 3.0D);
+      assertTrue(maxi.doubleValue() == 13.0D);
+      assertTrue(maxf.doubleValue() == 9.0D);
+      assertTrue(avgi.doubleValue() == 9.5D);
+      assertTrue(avgf.doubleValue() == 6.5D);
+      assertTrue(count.doubleValue() == 4);
+
+      tuple = tuples.get(2);
+      bucket = tuple.getString("a_s");
+      sumi = tuple.getDouble("sum(a_i)");
+      sumf = tuple.getDouble("sum(a_f)");
+      mini = tuple.getDouble("min(a_i)");
+      minf = tuple.getDouble("min(a_f)");
+      maxi = tuple.getDouble("max(a_i)");
+      maxf = tuple.getDouble("max(a_f)");
+      avgi = tuple.getDouble("avg(a_i)");
+      avgf = tuple.getDouble("avg(a_f)");
+      count = tuple.getDouble("count(*)");
 
+      assertTrue(bucket.equals("hello4"));
+      assertTrue(sumi.longValue() == 15);
+      assertTrue(sumf.doubleValue() == 11.0D);
+      assertTrue(mini.doubleValue() == 4.0D);
+      assertTrue(minf.doubleValue() == 4.0D);
+      assertTrue(maxi.doubleValue() == 11.0D);
+      assertTrue(maxf.doubleValue() == 7.0D);
+      assertTrue(avgi.doubleValue() == 7.5D);
+      assertTrue(avgf.doubleValue() == 5.5D);
+      assertTrue(count.doubleValue() == 2);
+    } finally {
+      solrClientCache.close();
+    }
   }
 
   @Test
@@ -1994,52 +2208,62 @@ public class StreamExpressionTest extends SolrCloudTestCase {
     StreamExpression expression;
     TupleStream stream;
     List<Tuple> tuples;
+    StreamContext streamContext = new StreamContext();
+    SolrClientCache solrClientCache = new SolrClientCache();
+    streamContext.setSolrClientCache(solrClientCache);
     
     StreamFactory factory = new StreamFactory()
       .withCollectionZkHost(COLLECTIONORALIAS, cluster.getZkServer().getZkAddress())
       .withFunctionName("search", CloudSolrStream.class)
       .withFunctionName("innerJoin", InnerJoinStream.class);
-    
-    // Basic test
-    expression = StreamExpressionParser.parse("innerJoin("
-                                                + "search(" + COLLECTIONORALIAS + ", q=\"side_s:left\", fl=\"id,join1_i,join2_s,ident_s\", sort=\"join1_i asc, join2_s asc, id asc\"),"
-                                                + "search(" + COLLECTIONORALIAS + ", q=\"side_s:right\", fl=\"join1_i,join2_s,ident_s\", sort=\"join1_i asc, join2_s asc\"),"
-                                                + "on=\"join1_i=join1_i, join2_s=join2_s\")");
-    stream = new InnerJoinStream(expression, factory);
-    tuples = getTuples(stream);    
-    assert(tuples.size() == 8);
-    assertOrder(tuples, 1, 1, 15, 15, 3, 4, 5, 7);
-
-    // Basic desc
-    expression = StreamExpressionParser.parse("innerJoin("
-                                                + "search(" + COLLECTIONORALIAS + ", q=\"side_s:left\", fl=\"id,join1_i,join2_s,ident_s\", sort=\"join1_i desc, join2_s asc\"),"
-                                                + "search(" + COLLECTIONORALIAS + ", q=\"side_s:right\", fl=\"join1_i,join2_s,ident_s\", sort=\"join1_i desc, join2_s asc\"),"
-                                                + "on=\"join1_i=join1_i, join2_s=join2_s\")");
-    stream = new InnerJoinStream(expression, factory);
-    tuples = getTuples(stream);    
-    assert(tuples.size() == 8);
-    assertOrder(tuples, 7,3,4,5,1,1,15,15);
-    
-    // Results in both searches, no join matches
-    expression = StreamExpressionParser.parse("innerJoin("
-        + "search(" + COLLECTIONORALIAS + ", q=\"side_s:left\", fl=\"id,join1_i,join2_s,ident_s\", sort=\"ident_s asc\"),"
-        + "search(" + COLLECTIONORALIAS + ", q=\"side_s:right\", fl=\"id,join1_i,join2_s,ident_s\", sort=\"ident_s asc\", aliases=\"id=right.id, join1_i=right.join1_i, join2_s=right.join2_s, ident_s=right.ident_s\"),"
-        + "on=\"ident_s=right.ident_s\")");
-    stream = new InnerJoinStream(expression, factory);
-    tuples = getTuples(stream);    
-    assert(tuples.size() == 0);
-    
-    // Differing field names
-    expression = StreamExpressionParser.parse("innerJoin("
-                                                + "search(" + COLLECTIONORALIAS + ", q=\"side_s:left\", fl=\"id,join1_i,join2_s,ident_s\", sort=\"join1_i asc, join2_s asc, id asc\"),"
-                                                + "search(" + COLLECTIONORALIAS + ", q=\"side_s:right\", fl=\"join3_i,join2_s,ident_s\", sort=\"join3_i asc, join2_s asc\", aliases=\"join3_i=aliasesField\"),"
-                                                + "on=\"join1_i=aliasesField, join2_s=join2_s\")");
-    stream = new InnerJoinStream(expression, factory);
-    tuples = getTuples(stream);
-    
-    assert(tuples.size() == 8);
-    assertOrder(tuples, 1, 1, 15, 15, 3, 4, 5, 7);
 
+    try {
+      // Basic test
+      expression = StreamExpressionParser.parse("innerJoin("
+          + "search(" + COLLECTIONORALIAS + ", q=\"side_s:left\", fl=\"id,join1_i,join2_s,ident_s\", sort=\"join1_i asc, join2_s asc, id asc\"),"
+          + "search(" + COLLECTIONORALIAS + ", q=\"side_s:right\", fl=\"join1_i,join2_s,ident_s\", sort=\"join1_i asc, join2_s asc\"),"
+          + "on=\"join1_i=join1_i, join2_s=join2_s\")");
+      stream = new InnerJoinStream(expression, factory);
+      stream.setStreamContext(streamContext);
+      tuples = getTuples(stream);
+      assert (tuples.size() == 8);
+      assertOrder(tuples, 1, 1, 15, 15, 3, 4, 5, 7);
+
+      // Basic desc
+      expression = StreamExpressionParser.parse("innerJoin("
+          + "search(" + COLLECTIONORALIAS + ", q=\"side_s:left\", fl=\"id,join1_i,join2_s,ident_s\", sort=\"join1_i desc, join2_s asc\"),"
+          + "search(" + COLLECTIONORALIAS + ", q=\"side_s:right\", fl=\"join1_i,join2_s,ident_s\", sort=\"join1_i desc, join2_s asc\"),"
+          + "on=\"join1_i=join1_i, join2_s=join2_s\")");
+      stream = new InnerJoinStream(expression, factory);
+      stream.setStreamContext(streamContext);
+      tuples = getTuples(stream);
+      assert (tuples.size() == 8);
+      assertOrder(tuples, 7, 3, 4, 5, 1, 1, 15, 15);
+
+      // Results in both searches, no join matches
+      expression = StreamExpressionParser.parse("innerJoin("
+          + "search(" + COLLECTIONORALIAS + ", q=\"side_s:left\", fl=\"id,join1_i,join2_s,ident_s\", sort=\"ident_s asc\"),"
+          + "search(" + COLLECTIONORALIAS + ", q=\"side_s:right\", fl=\"id,join1_i,join2_s,ident_s\", sort=\"ident_s asc\", aliases=\"id=right.id, join1_i=right.join1_i, join2_s=right.join2_s, ident_s=right.ident_s\"),"
+          + "on=\"ident_s=right.ident_s\")");
+      stream = new InnerJoinStream(expression, factory);
+      stream.setStreamContext(streamContext);
+      tuples = getTuples(stream);
+      assert (tuples.size() == 0);
+
+      // Differing field names
+      expression = StreamExpressionParser.parse("innerJoin("
+          + "search(" + COLLECTIONORALIAS + ", q=\"side_s:left\", fl=\"id,join1_i,join2_s,ident_s\", sort=\"join1_i asc, join2_s asc, id asc\"),"
+          + "search(" + COLLECTIONORALIAS + ", q=\"side_s:right\", fl=\"join3_i,join2_s,ident_s\", sort=\"join3_i asc, join2_s asc\", aliases=\"join3_i=aliasesField\"),"
+          + "on=\"join1_i=aliasesField, join2_s=join2_s\")");
+      stream = new InnerJoinStream(expression, factory);
+      stream.setStreamContext(streamContext);
+      tuples = getTuples(stream);
+
+      assert (tuples.size() == 8);
+      assertOrder(tuples, 1, 1, 15, 15, 3, 4, 5, 7);
+    } finally {
+      solrClientCache.close();
+    }
   }
 
   @Test
@@ -2067,6 +2291,9 @@ public class StreamExpressionTest extends SolrCloudTestCase {
     StreamExpression expression;
     TupleStream stream;
     List<Tuple> tuples;
+    StreamContext streamContext = new StreamContext();
+    SolrClientCache solrClientCache = new SolrClientCache();
+    streamContext.setSolrClientCache(solrClientCache);
     
     StreamFactory factory = new StreamFactory()
       .withCollectionZkHost(COLLECTIONORALIAS, clus

<TRUNCATED>

[3/4] lucene-solr:master: SOLR-10274: The search Streaming Expression should work in non-SolrCloud mode

Posted by jb...@apache.org.
SOLR-10274: The search Streaming Expression should work in non-SolrCloud mode


Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/06a55b73
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/06a55b73
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/06a55b73

Branch: refs/heads/master
Commit: 06a55b73b97db0a2cff490dcf40670079a953f60
Parents: 57c5837
Author: Joel Bernstein <jb...@apache.org>
Authored: Tue Apr 11 15:17:03 2017 -0400
Committer: Joel Bernstein <jb...@apache.org>
Committed: Tue Apr 11 15:24:42 2017 -0400

----------------------------------------------------------------------
 .../client/solrj/io/stream/CloudSolrStream.java |   41 +-
 .../client/solrj/io/stream/ParallelStream.java  |   27 +-
 .../client/solrj/io/stream/StreamContext.java   |    4 +
 .../client/solrj/io/stream/JDBCStreamTest.java  |  308 +-
 .../io/stream/SelectWithEvaluatorsTest.java     |   37 +-
 .../solrj/io/stream/StreamExpressionTest.java   | 3783 ++++++++++--------
 .../client/solrj/io/stream/StreamingTest.java   | 2505 ++++++------
 7 files changed, 3652 insertions(+), 3053 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/06a55b73/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/CloudSolrStream.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/CloudSolrStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/CloudSolrStream.java
index 7161dc4..6d1764a 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/CloudSolrStream.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/CloudSolrStream.java
@@ -26,8 +26,6 @@ import java.util.List;
 import java.util.Locale;
 import java.util.Map;
 import java.util.Map.Entry;
-import java.util.Random;
-import java.util.Set;
 import java.util.TreeSet;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
@@ -35,7 +33,6 @@ import java.util.concurrent.Future;
 import java.util.stream.Collectors;
 
 import org.apache.solr.client.solrj.impl.CloudSolrClient;
-import org.apache.solr.client.solrj.impl.CloudSolrClient.Builder;
 import org.apache.solr.client.solrj.io.Tuple;
 import org.apache.solr.client.solrj.io.comp.ComparatorOrder;
 import org.apache.solr.client.solrj.io.comp.FieldComparator;
@@ -52,9 +49,7 @@ import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
 import org.apache.solr.common.cloud.Aliases;
 import org.apache.solr.common.cloud.ClusterState;
 import org.apache.solr.common.cloud.DocCollection;
-import org.apache.solr.common.cloud.Replica;
 import org.apache.solr.common.cloud.Slice;
-import org.apache.solr.common.cloud.ZkCoreNodeProps;
 import org.apache.solr.common.cloud.ZkStateReader;
 import org.apache.solr.common.params.MapSolrParams;
 import org.apache.solr.common.params.ModifiableSolrParams;
@@ -178,9 +173,11 @@ public class CloudSolrStream extends TupleStream implements Expressible {
     else if(zkHostExpression.getParameter() instanceof StreamExpressionValue){
       zkHost = ((StreamExpressionValue)zkHostExpression.getParameter()).getValue();
     }
+    /*
     if(null == zkHost){
       throw new IOException(String.format(Locale.ROOT,"invalid expression %s - zkHost not found for collection '%s'",expression,collectionName));
     }
+    */
     
     // We've got all the required items
     init(collectionName, zkHost, mParams);
@@ -299,14 +296,6 @@ public class CloudSolrStream extends TupleStream implements Expressible {
     this.tuples = new TreeSet();
     this.solrStreams = new ArrayList();
     this.eofTuples = Collections.synchronizedMap(new HashMap());
-    if (this.streamContext != null && this.streamContext.getSolrClientCache() != null) {
-      this.cloudSolrClient = this.streamContext.getSolrClientCache().getCloudSolrClient(zkHost);
-    } else {
-      this.cloudSolrClient = new Builder()
-          .withZkHost(zkHost)
-          .build();
-      this.cloudSolrClient.connect();
-    }
     constructStreams();
     openStreams();
   }
@@ -400,29 +389,15 @@ public class CloudSolrStream extends TupleStream implements Expressible {
 
   protected void constructStreams() throws IOException {
     try {
-      ZkStateReader zkStateReader = cloudSolrClient.getZkStateReader();
-      ClusterState clusterState = zkStateReader.getClusterState();
 
-      Collection<Slice> slices = CloudSolrStream.getSlices(this.collection, zkStateReader, true);
+      List<String> shardUrls = getShards(this.zkHost, this.collection, this.streamContext);
 
       ModifiableSolrParams mParams = new ModifiableSolrParams(params);
       mParams = adjustParams(mParams);
       mParams.set(DISTRIB, "false"); // We are the aggregator.
 
-      Set<String> liveNodes = clusterState.getLiveNodes();
-      for(Slice slice : slices) {
-        Collection<Replica> replicas = slice.getReplicas();
-        List<Replica> shuffler = new ArrayList<>();
-        for(Replica replica : replicas) {
-          if(replica.getState() == Replica.State.ACTIVE && liveNodes.contains(replica.getNodeName()))
-          shuffler.add(replica);
-        }
-
-        Collections.shuffle(shuffler, new Random());
-        Replica rep = shuffler.get(0);
-        ZkCoreNodeProps zkProps = new ZkCoreNodeProps(rep);
-        String url = zkProps.getCoreUrl();
-        SolrStream solrStream = new SolrStream(url, mParams);
+      for(String shardUrl : shardUrls) {
+        SolrStream solrStream = new SolrStream(shardUrl, mParams);
         if(streamContext != null) {
           solrStream.setStreamContext(streamContext);
         }
@@ -468,12 +443,6 @@ public class CloudSolrStream extends TupleStream implements Expressible {
         solrStream.close();
       }
     }
-
-    if ((this.streamContext == null || this.streamContext.getSolrClientCache() == null) &&
-        cloudSolrClient != null) {
-
-      cloudSolrClient.close();
-    }
   }
   
   /** Return the stream sort - ie, the order in which records are returned */

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/06a55b73/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ParallelStream.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ParallelStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ParallelStream.java
index 87e1354..def4c03 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ParallelStream.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ParallelStream.java
@@ -263,27 +263,7 @@ public class ParallelStream extends CloudSolrStream implements Expressible {
     try {
       Object pushStream = ((Expressible) tupleStream).toExpression(streamFactory);
 
-      ZkStateReader zkStateReader = cloudSolrClient.getZkStateReader();
-
-      Collection<Slice> slices = CloudSolrStream.getSlices(this.collection, zkStateReader, true);
-
-      ClusterState clusterState = zkStateReader.getClusterState();
-      Set<String> liveNodes = clusterState.getLiveNodes();
-
-      List<Replica> shuffler = new ArrayList<>();
-      for(Slice slice : slices) {
-        Collection<Replica> replicas = slice.getReplicas();
-        for (Replica replica : replicas) {
-          if(replica.getState() == Replica.State.ACTIVE && liveNodes.contains(replica.getNodeName()))
-          shuffler.add(replica);
-        }
-      }
-
-      if(workers > shuffler.size()) {
-        throw new IOException("Number of workers exceeds nodes in the worker collection");
-      }
-
-      Collections.shuffle(shuffler, new Random());
+      List<String> shardUrls = getShards(this.zkHost, this.collection, this.streamContext);
 
       for(int w=0; w<workers; w++) {
         ModifiableSolrParams paramsLoc = new ModifiableSolrParams();
@@ -293,9 +273,8 @@ public class ParallelStream extends CloudSolrStream implements Expressible {
 
         paramsLoc.set("expr", pushStream.toString());
         paramsLoc.set("qt","/stream");
-        Replica rep = shuffler.get(w);
-        ZkCoreNodeProps zkProps = new ZkCoreNodeProps(rep);
-        String url = zkProps.getCoreUrl();
+
+        String url = shardUrls.get(w);
         SolrStream solrStream = new SolrStream(url, paramsLoc);
         solrStreams.add(solrStream);
       }

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/06a55b73/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/StreamContext.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/StreamContext.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/StreamContext.java
index 6cbf090..d1460ea 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/StreamContext.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/StreamContext.java
@@ -50,6 +50,10 @@ public class StreamContext implements Serializable{
     this.entries.put(key, value);
   }
 
+  public boolean containsKey(Object key) {
+    return entries.containsKey(key);
+  }
+
   public Map getEntries() {
     return this.entries;
   }

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/06a55b73/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/JDBCStreamTest.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/JDBCStreamTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/JDBCStreamTest.java
index e55c837..9fff33a 100644
--- a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/JDBCStreamTest.java
+++ b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/JDBCStreamTest.java
@@ -27,6 +27,7 @@ import java.util.List;
 import java.util.Locale;
 
 import org.apache.lucene.util.LuceneTestCase;
+import org.apache.solr.client.solrj.io.SolrClientCache;
 import org.apache.solr.client.solrj.io.Tuple;
 import org.apache.solr.client.solrj.io.comp.ComparatorOrder;
 import org.apache.solr.client.solrj.io.comp.FieldComparator;
@@ -205,6 +206,10 @@ public class JDBCStreamTest extends SolrCloudTestCase {
       statement.executeUpdate("insert into COUNTRIES (CODE,COUNTRY_NAME) values ('NO', 'Norway')");
       statement.executeUpdate("insert into COUNTRIES (CODE,COUNTRY_NAME) values ('AL', 'Algeria')");
     }
+
+    StreamContext streamContext = new StreamContext();
+    SolrClientCache solrClientCache = new SolrClientCache();
+    streamContext.setSolrClientCache(solrClientCache);
     
     // Load Solr
     new UpdateRequest()
@@ -217,18 +222,25 @@ public class JDBCStreamTest extends SolrCloudTestCase {
       .withFunctionName("search", CloudSolrStream.class);
     
     List<Tuple> tuples;
-    
-    // Simple 1
-    TupleStream jdbcStream = new JDBCStream("jdbc:hsqldb:mem:.", "select CODE,COUNTRY_NAME from COUNTRIES order by CODE", new FieldComparator("CODE", ComparatorOrder.ASCENDING));
-    TupleStream selectStream = new SelectStream(jdbcStream, new HashMap<String, String>(){{ put("CODE", "code_s"); put("COUNTRY_NAME", "name_s"); }});
-    TupleStream searchStream = factory.constructStream("search(" + COLLECTIONORALIAS + ", fl=\"code_s,name_s\",q=\"*:*\",sort=\"code_s asc\")");
-    TupleStream mergeStream = new MergeStream(new FieldComparator("code_s", ComparatorOrder.ASCENDING), new TupleStream[]{selectStream,searchStream});
-    
-    tuples = getTuples(mergeStream);
-    
-    assertEquals(7, tuples.size());
-    assertOrderOf(tuples, "code_s", "AL","CA","GB","NL","NO","NP","US");
-    assertOrderOf(tuples, "name_s", "Algeria", "Canada", "Great Britian", "Netherlands", "Norway", "Nepal", "United States");
+
+    try {
+      // Simple 1
+      TupleStream jdbcStream = new JDBCStream("jdbc:hsqldb:mem:.", "select CODE,COUNTRY_NAME from COUNTRIES order by CODE", new FieldComparator("CODE", ComparatorOrder.ASCENDING));
+      TupleStream selectStream = new SelectStream(jdbcStream, new HashMap<String, String>() {{
+        put("CODE", "code_s");
+        put("COUNTRY_NAME", "name_s");
+      }});
+      TupleStream searchStream = factory.constructStream("search(" + COLLECTIONORALIAS + ", fl=\"code_s,name_s\",q=\"*:*\",sort=\"code_s asc\")");
+      TupleStream mergeStream = new MergeStream(new FieldComparator("code_s", ComparatorOrder.ASCENDING), new TupleStream[]{selectStream, searchStream});
+      mergeStream.setStreamContext(streamContext);
+      tuples = getTuples(mergeStream);
+
+      assertEquals(7, tuples.size());
+      assertOrderOf(tuples, "code_s", "AL", "CA", "GB", "NL", "NO", "NP", "US");
+      assertOrderOf(tuples, "name_s", "Algeria", "Canada", "Great Britian", "Netherlands", "Norway", "Nepal", "United States");
+    } finally {
+      solrClientCache.close();
+    }
   }
 
   @Test
@@ -277,32 +289,41 @@ public class JDBCStreamTest extends SolrCloudTestCase {
     String expression;
     TupleStream stream;
     List<Tuple> tuples;
-    
-    // Basic test
-    expression =   
-              "innerJoin("
-            + "  select("
-            + "    search(" + COLLECTIONORALIAS + ", fl=\"personId_i,rating_f\", q=\"rating_f:*\", sort=\"personId_i asc\"),"
-            + "    personId_i as personId,"
-            + "    rating_f as rating"
-            + "  ),"
-            + "  select("
-            + "    jdbc(connection=\"jdbc:hsqldb:mem:.\", sql=\"select PEOPLE.ID, PEOPLE.NAME, COUNTRIES.COUNTRY_NAME from PEOPLE inner join COUNTRIES on PEOPLE.COUNTRY_CODE = COUNTRIES.CODE order by PEOPLE.ID\", sort=\"ID asc\"),"
-            + "    ID as personId,"
-            + "    NAME as personName,"
-            + "    COUNTRY_NAME as country"
-            + "  ),"
-            + "  on=\"personId\""
-            + ")";
-
-    stream = factory.constructStream(expression);
-    tuples = getTuples(stream);
-    
-    assertEquals(10, tuples.size());
-    assertOrderOf(tuples, "personId", 11,12,13,14,15,16,17,18,19,20);
-    assertOrderOf(tuples, "rating", 3.5d,5d,2.2d,4.3d,3.5d,3d,3d,4d,4.1d,4.8d);
-    assertOrderOf(tuples, "personName", "Emma","Grace","Hailey","Isabella","Lily","Madison","Mia","Natalie","Olivia","Samantha");
-    assertOrderOf(tuples, "country", "Netherlands","United States","Netherlands","Netherlands","Netherlands","United States","United States","Netherlands","Netherlands","United States");
+    StreamContext streamContext = new StreamContext();
+    SolrClientCache solrClientCache = new SolrClientCache();
+    streamContext.setSolrClientCache(solrClientCache);
+
+    try {
+      // Basic test
+      expression =
+          "innerJoin("
+              + "  select("
+              + "    search(" + COLLECTIONORALIAS + ", fl=\"personId_i,rating_f\", q=\"rating_f:*\", sort=\"personId_i asc\"),"
+              + "    personId_i as personId,"
+              + "    rating_f as rating"
+              + "  ),"
+              + "  select("
+              + "    jdbc(connection=\"jdbc:hsqldb:mem:.\", sql=\"select PEOPLE.ID, PEOPLE.NAME, COUNTRIES.COUNTRY_NAME from PEOPLE inner join COUNTRIES on PEOPLE.COUNTRY_CODE = COUNTRIES.CODE order by PEOPLE.ID\", sort=\"ID asc\"),"
+              + "    ID as personId,"
+              + "    NAME as personName,"
+              + "    COUNTRY_NAME as country"
+              + "  ),"
+              + "  on=\"personId\""
+              + ")";
+
+
+      stream = factory.constructStream(expression);
+      stream.setStreamContext(streamContext);
+      tuples = getTuples(stream);
+
+      assertEquals(10, tuples.size());
+      assertOrderOf(tuples, "personId", 11, 12, 13, 14, 15, 16, 17, 18, 19, 20);
+      assertOrderOf(tuples, "rating", 3.5d, 5d, 2.2d, 4.3d, 3.5d, 3d, 3d, 4d, 4.1d, 4.8d);
+      assertOrderOf(tuples, "personName", "Emma", "Grace", "Hailey", "Isabella", "Lily", "Madison", "Mia", "Natalie", "Olivia", "Samantha");
+      assertOrderOf(tuples, "country", "Netherlands", "United States", "Netherlands", "Netherlands", "Netherlands", "United States", "United States", "Netherlands", "Netherlands", "United States");
+    } finally {
+      solrClientCache.close();
+    }
   }
 
   @Test
@@ -351,58 +372,67 @@ public class JDBCStreamTest extends SolrCloudTestCase {
     String expression;
     TupleStream stream;
     List<Tuple> tuples;
-    
-    // Basic test for no alias
-    expression =
-              "innerJoin("
-            + "  select("
-            + "    search(" + COLLECTIONORALIAS + ", fl=\"personId_i,rating_f\", q=\"rating_f:*\", sort=\"personId_i asc\"),"
-            + "    personId_i as personId,"
-            + "    rating_f as rating"
-            + "  ),"
-            + "  select("
-            + "    jdbc(connection=\"jdbc:hsqldb:mem:.\", sql=\"select PEOPLE.ID, PEOPLE.NAME, COUNTRIES.COUNTRY_NAME from PEOPLE inner join COUNTRIES on PEOPLE.COUNTRY_CODE = COUNTRIES.CODE order by PEOPLE.ID\", sort=\"ID asc\"),"
-            + "    ID as personId,"
-            + "    NAME as personName,"
-            + "    COUNTRY_NAME as country"
-            + "  ),"
-            + "  on=\"personId\""
-            + ")";
-
-    stream = factory.constructStream(expression);
-    tuples = getTuples(stream);
-    
-    assertEquals(10, tuples.size());
-    assertOrderOf(tuples, "personId", 11,12,13,14,15,16,17,18,19,20);
-    assertOrderOf(tuples, "rating", 3.5d,5d,2.2d,4.3d,3.5d,3d,3d,4d,4.1d,4.8d);
-    assertOrderOf(tuples, "personName", "Emma","Grace","Hailey","Isabella","Lily","Madison","Mia","Natalie","Olivia","Samantha");
-    assertOrderOf(tuples, "country", "Netherlands","United States","Netherlands","Netherlands","Netherlands","United States","United States","Netherlands","Netherlands","United States");
-    
-    // Basic test for alias
-    expression =   
-              "innerJoin("
-            + "  select("
-            + "    search(" + COLLECTIONORALIAS + ", fl=\"personId_i,rating_f\", q=\"rating_f:*\", sort=\"personId_i asc\"),"
-            + "    personId_i as personId,"
-            + "    rating_f as rating"
-            + "  ),"
-            + "  select("
-            + "    jdbc(connection=\"jdbc:hsqldb:mem:.\", sql=\"select PEOPLE.ID as PERSONID, PEOPLE.NAME, COUNTRIES.COUNTRY_NAME from PEOPLE inner join COUNTRIES on PEOPLE.COUNTRY_CODE = COUNTRIES.CODE order by PEOPLE.ID\", sort=\"PERSONID asc\"),"
-            + "    PERSONID as personId,"
-            + "    NAME as personName,"
-            + "    COUNTRY_NAME as country"
-            + "  ),"
-            + "  on=\"personId\""
-            + ")";
-
-    stream = factory.constructStream(expression);
-    tuples = getTuples(stream);
-    
-    assertEquals(10, tuples.size());
-    assertOrderOf(tuples, "personId", 11,12,13,14,15,16,17,18,19,20);
-    assertOrderOf(tuples, "rating", 3.5d,5d,2.2d,4.3d,3.5d,3d,3d,4d,4.1d,4.8d);
-    assertOrderOf(tuples, "personName", "Emma","Grace","Hailey","Isabella","Lily","Madison","Mia","Natalie","Olivia","Samantha");
-    assertOrderOf(tuples, "country", "Netherlands","United States","Netherlands","Netherlands","Netherlands","United States","United States","Netherlands","Netherlands","United States");
+    StreamContext streamContext = new StreamContext();
+    SolrClientCache solrClientCache = new SolrClientCache();
+    streamContext.setSolrClientCache(solrClientCache);
+
+    try {
+      // Basic test for no alias
+      expression =
+          "innerJoin("
+              + "  select("
+              + "    search(" + COLLECTIONORALIAS + ", fl=\"personId_i,rating_f\", q=\"rating_f:*\", sort=\"personId_i asc\"),"
+              + "    personId_i as personId,"
+              + "    rating_f as rating"
+              + "  ),"
+              + "  select("
+              + "    jdbc(connection=\"jdbc:hsqldb:mem:.\", sql=\"select PEOPLE.ID, PEOPLE.NAME, COUNTRIES.COUNTRY_NAME from PEOPLE inner join COUNTRIES on PEOPLE.COUNTRY_CODE = COUNTRIES.CODE order by PEOPLE.ID\", sort=\"ID asc\"),"
+              + "    ID as personId,"
+              + "    NAME as personName,"
+              + "    COUNTRY_NAME as country"
+              + "  ),"
+              + "  on=\"personId\""
+              + ")";
+
+      stream = factory.constructStream(expression);
+      stream.setStreamContext(streamContext);
+      tuples = getTuples(stream);
+
+      assertEquals(10, tuples.size());
+      assertOrderOf(tuples, "personId", 11, 12, 13, 14, 15, 16, 17, 18, 19, 20);
+      assertOrderOf(tuples, "rating", 3.5d, 5d, 2.2d, 4.3d, 3.5d, 3d, 3d, 4d, 4.1d, 4.8d);
+      assertOrderOf(tuples, "personName", "Emma", "Grace", "Hailey", "Isabella", "Lily", "Madison", "Mia", "Natalie", "Olivia", "Samantha");
+      assertOrderOf(tuples, "country", "Netherlands", "United States", "Netherlands", "Netherlands", "Netherlands", "United States", "United States", "Netherlands", "Netherlands", "United States");
+
+      // Basic test for alias
+      expression =
+          "innerJoin("
+              + "  select("
+              + "    search(" + COLLECTIONORALIAS + ", fl=\"personId_i,rating_f\", q=\"rating_f:*\", sort=\"personId_i asc\"),"
+              + "    personId_i as personId,"
+              + "    rating_f as rating"
+              + "  ),"
+              + "  select("
+              + "    jdbc(connection=\"jdbc:hsqldb:mem:.\", sql=\"select PEOPLE.ID as PERSONID, PEOPLE.NAME, COUNTRIES.COUNTRY_NAME from PEOPLE inner join COUNTRIES on PEOPLE.COUNTRY_CODE = COUNTRIES.CODE order by PEOPLE.ID\", sort=\"PERSONID asc\"),"
+              + "    PERSONID as personId,"
+              + "    NAME as personName,"
+              + "    COUNTRY_NAME as country"
+              + "  ),"
+              + "  on=\"personId\""
+              + ")";
+
+      stream = factory.constructStream(expression);
+      stream.setStreamContext(streamContext);
+      tuples = getTuples(stream);
+
+      assertEquals(10, tuples.size());
+      assertOrderOf(tuples, "personId", 11, 12, 13, 14, 15, 16, 17, 18, 19, 20);
+      assertOrderOf(tuples, "rating", 3.5d, 5d, 2.2d, 4.3d, 3.5d, 3d, 3d, 4d, 4.1d, 4.8d);
+      assertOrderOf(tuples, "personName", "Emma", "Grace", "Hailey", "Isabella", "Lily", "Madison", "Mia", "Natalie", "Olivia", "Samantha");
+      assertOrderOf(tuples, "country", "Netherlands", "United States", "Netherlands", "Netherlands", "Netherlands", "United States", "United States", "Netherlands", "Netherlands", "United States");
+    } finally {
+      solrClientCache.close();
+    }
   }
 
   @Test
@@ -439,7 +469,7 @@ public class JDBCStreamTest extends SolrCloudTestCase {
       statement.executeUpdate("insert into PEOPLE (ID, NAME, COUNTRY_CODE) values (19,'Olivia','NL')");
       statement.executeUpdate("insert into PEOPLE (ID, NAME, COUNTRY_CODE) values (20,'Samantha','US')");
     }
-    
+
     // Load solr data
     new UpdateRequest()
         .add(id, "1", "rating_f", "3.5", "personId_i", "11")
@@ -457,50 +487,58 @@ public class JDBCStreamTest extends SolrCloudTestCase {
     String expression;
     TupleStream stream;
     List<Tuple> tuples;
-    
-    // Basic test
-    expression =   
-              "rollup("
-            + "  hashJoin("
-            + "    hashed=select("
-            + "      search(" + COLLECTIONORALIAS + ", fl=\"personId_i,rating_f\", q=\"rating_f:*\", sort=\"personId_i asc\"),"
-            + "      personId_i as personId,"
-            + "      rating_f as rating"
-            + "    ),"
-            + "    select("
-            + "      jdbc(connection=\"jdbc:hsqldb:mem:.\", sql=\"select PEOPLE.ID, PEOPLE.NAME, COUNTRIES.COUNTRY_NAME from PEOPLE inner join COUNTRIES on PEOPLE.COUNTRY_CODE = COUNTRIES.CODE order by COUNTRIES.COUNTRY_NAME\", sort=\"COUNTRIES.COUNTRY_NAME asc\"),"
-            + "      ID as personId,"
-            + "      NAME as personName,"
-            + "      COUNTRY_NAME as country"
-            + "    ),"
-            + "    on=\"personId\""
-            + "  ),"
-            + "  over=\"country\","
-            + "  max(rating),"
-            + "  min(rating),"
-            + "  avg(rating),"
-            + "  count(*)"
-            + ")";
-
-    stream = factory.constructStream(expression);
-    tuples = getTuples(stream);
-    
-    assertEquals(2, tuples.size());
-    
-    Tuple tuple = tuples.get(0);
-    assertEquals("Netherlands",tuple.getString("country"));
-    assertTrue(4.3D == tuple.getDouble("max(rating)"));
-    assertTrue(2.2D == tuple.getDouble("min(rating)"));
-    assertTrue(3.6D == tuple.getDouble("avg(rating)"));
-    assertTrue(6D == tuple.getDouble("count(*)"));
-    
-    tuple = tuples.get(1);
-    assertEquals("United States",tuple.getString("country"));
-    assertTrue(5D == tuple.getDouble("max(rating)"));
-    assertTrue(3D == tuple.getDouble("min(rating)"));
-    assertTrue(3.95D == tuple.getDouble("avg(rating)"));
-    assertTrue(4D == tuple.getDouble("count(*)"));
-    
+
+    StreamContext streamContext = new StreamContext();
+    SolrClientCache solrClientCache = new SolrClientCache();
+    streamContext.setSolrClientCache(solrClientCache);
+
+    try {
+      // Basic test
+      expression =
+          "rollup("
+              + "  hashJoin("
+              + "    hashed=select("
+              + "      search(" + COLLECTIONORALIAS + ", fl=\"personId_i,rating_f\", q=\"rating_f:*\", sort=\"personId_i asc\"),"
+              + "      personId_i as personId,"
+              + "      rating_f as rating"
+              + "    ),"
+              + "    select("
+              + "      jdbc(connection=\"jdbc:hsqldb:mem:.\", sql=\"select PEOPLE.ID, PEOPLE.NAME, COUNTRIES.COUNTRY_NAME from PEOPLE inner join COUNTRIES on PEOPLE.COUNTRY_CODE = COUNTRIES.CODE order by COUNTRIES.COUNTRY_NAME\", sort=\"COUNTRIES.COUNTRY_NAME asc\"),"
+              + "      ID as personId,"
+              + "      NAME as personName,"
+              + "      COUNTRY_NAME as country"
+              + "    ),"
+              + "    on=\"personId\""
+              + "  ),"
+              + "  over=\"country\","
+              + "  max(rating),"
+              + "  min(rating),"
+              + "  avg(rating),"
+              + "  count(*)"
+              + ")";
+
+      stream = factory.constructStream(expression);
+      stream.setStreamContext(streamContext);
+      tuples = getTuples(stream);
+
+      assertEquals(2, tuples.size());
+
+      Tuple tuple = tuples.get(0);
+      assertEquals("Netherlands", tuple.getString("country"));
+      assertTrue(4.3D == tuple.getDouble("max(rating)"));
+      assertTrue(2.2D == tuple.getDouble("min(rating)"));
+      assertTrue(3.6D == tuple.getDouble("avg(rating)"));
+      assertTrue(6D == tuple.getDouble("count(*)"));
+
+      tuple = tuples.get(1);
+      assertEquals("United States", tuple.getString("country"));
+      assertTrue(5D == tuple.getDouble("max(rating)"));
+      assertTrue(3D == tuple.getDouble("min(rating)"));
+      assertTrue(3.95D == tuple.getDouble("avg(rating)"));
+      assertTrue(4D == tuple.getDouble("count(*)"));
+    } finally {
+      solrClientCache.close();
+    }
   }
   
   @Test(expected=IOException.class)

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/06a55b73/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/SelectWithEvaluatorsTest.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/SelectWithEvaluatorsTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/SelectWithEvaluatorsTest.java
index b91df8d..75bf92d 100644
--- a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/SelectWithEvaluatorsTest.java
+++ b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/SelectWithEvaluatorsTest.java
@@ -24,6 +24,7 @@ import java.util.Map;
 
 import org.apache.lucene.util.LuceneTestCase;
 import org.apache.lucene.util.LuceneTestCase.Slow;
+import org.apache.solr.client.solrj.io.SolrClientCache;
 import org.apache.solr.client.solrj.io.Tuple;
 import org.apache.solr.client.solrj.io.eval.AddEvaluator;
 import org.apache.solr.client.solrj.io.eval.GreaterThanEvaluator;
@@ -92,6 +93,9 @@ public class SelectWithEvaluatorsTest extends SolrCloudTestCase {
     String clause;
     TupleStream stream;
     List<Tuple> tuples;
+    StreamContext streamContext = new StreamContext();
+    SolrClientCache solrClientCache = new SolrClientCache();
+    streamContext.setSolrClientCache(solrClientCache);
     
     StreamFactory factory = new StreamFactory()
       .withCollectionZkHost("collection1", cluster.getZkServer().getZkAddress())
@@ -101,21 +105,24 @@ public class SelectWithEvaluatorsTest extends SolrCloudTestCase {
       .withFunctionName("if", IfThenElseEvaluator.class)
       .withFunctionName("gt", GreaterThanEvaluator.class)
       ;
-    
-    // Basic test
-    clause = "select("
-            +   "id,"
-            +   "add(b_i,c_d) as result,"
-            +   "search(collection1, q=*:*, fl=\"id,a_s,b_i,c_d,d_b\", sort=\"id asc\")"
-            + ")";
-    stream = factory.constructStream(clause);
-    tuples = getTuples(stream);
-    assertFields(tuples, "id", "result");
-    assertNotFields(tuples, "a_s", "b_i", "c_d", "d_b");
-    assertEquals(1, tuples.size());
-    assertDouble(tuples.get(0), "result", 4.3);
-    assertEquals(4.3, tuples.get(0).get("result"));
-
+    try {
+      // Basic test
+      clause = "select("
+          + "id,"
+          + "add(b_i,c_d) as result,"
+          + "search(collection1, q=*:*, fl=\"id,a_s,b_i,c_d,d_b\", sort=\"id asc\")"
+          + ")";
+      stream = factory.constructStream(clause);
+      stream.setStreamContext(streamContext);
+      tuples = getTuples(stream);
+      assertFields(tuples, "id", "result");
+      assertNotFields(tuples, "a_s", "b_i", "c_d", "d_b");
+      assertEquals(1, tuples.size());
+      assertDouble(tuples.get(0), "result", 4.3);
+      assertEquals(4.3, tuples.get(0).get("result"));
+    } finally {
+      solrClientCache.close();
+    }
   }
   
   protected List<Tuple> getTuples(TupleStream tupleStream) throws IOException {


[4/4] lucene-solr:master: SOLR-10274: fix precommit

Posted by jb...@apache.org.
SOLR-10274: fix precommit


Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/5ebd41d1
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/5ebd41d1
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/5ebd41d1

Branch: refs/heads/master
Commit: 5ebd41d13f646ea45a16e079c7fe4f3ff7e634cf
Parents: 06a55b7
Author: Joel Bernstein <jb...@apache.org>
Authored: Tue Apr 11 15:36:03 2017 -0400
Committer: Joel Bernstein <jb...@apache.org>
Committed: Tue Apr 11 15:36:03 2017 -0400

----------------------------------------------------------------------
 .../apache/solr/client/solrj/io/stream/ParallelStream.java  | 9 ---------
 1 file changed, 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/5ebd41d1/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ParallelStream.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ParallelStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ParallelStream.java
index def4c03..58ba248 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ParallelStream.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ParallelStream.java
@@ -18,14 +18,10 @@ package org.apache.solr.client.solrj.io.stream;
 
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Locale;
 import java.util.Map;
-import java.util.Random;
-import java.util.Set;
 
 import org.apache.solr.client.solrj.io.Tuple;
 import org.apache.solr.client.solrj.io.comp.FieldComparator;
@@ -38,11 +34,6 @@ import org.apache.solr.client.solrj.io.stream.expr.StreamExpression;
 import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionNamedParameter;
 import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionValue;
 import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
-import org.apache.solr.common.cloud.ClusterState;
-import org.apache.solr.common.cloud.Replica;
-import org.apache.solr.common.cloud.Slice;
-import org.apache.solr.common.cloud.ZkCoreNodeProps;
-import org.apache.solr.common.cloud.ZkStateReader;
 import org.apache.solr.common.params.ModifiableSolrParams;
 
 import static org.apache.solr.common.params.CommonParams.DISTRIB;