You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ab...@apache.org on 2017/04/20 10:20:36 UTC

[03/23] lucene-solr:feature/autoscaling: Squash-merge from master.

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/d8df9f8c/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamingTest.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamingTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamingTest.java
index 2f2273e..0de3aa0 100644
--- a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamingTest.java
+++ b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamingTest.java
@@ -131,13 +131,20 @@ public void testUniqueStream() throws Exception {
       .add(id, "1", "a_s", "hello1", "a_i", "1", "a_f", "1")
       .commit(cluster.getSolrClient(), COLLECTIONORALIAS);
 
-  SolrParams sParams = StreamingTest.mapParams("q", "*:*", "fl", "id,a_s,a_i,a_f", "sort", "a_f asc,a_i asc");
-  CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParams);
-  UniqueStream ustream = new UniqueStream(stream, new FieldEqualitor("a_f"));
-  List<Tuple> tuples = getTuples(ustream);
-  assertEquals(4, tuples.size());
-  assertOrder(tuples, 0,1,3,4);
-
+  StreamContext streamContext = new StreamContext();
+  SolrClientCache solrClientCache = new SolrClientCache();
+  streamContext.setSolrClientCache(solrClientCache);
+  try {
+    SolrParams sParams = StreamingTest.mapParams("q", "*:*", "fl", "id,a_s,a_i,a_f", "sort", "a_f asc,a_i asc");
+    CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParams);
+    UniqueStream ustream = new UniqueStream(stream, new FieldEqualitor("a_f"));
+    ustream.setStreamContext(streamContext);
+    List<Tuple> tuples = getTuples(ustream);
+    assertEquals(4, tuples.size());
+    assertOrder(tuples, 0, 1, 3, 4);
+  } finally {
+    solrClientCache.close();
+  }
 }
 
 @Test
@@ -167,15 +174,22 @@ public void testNonePartitionKeys() throws Exception {
       .add(id, "8", "a_s", "hello3", "a_i", "13", "a_f", "9")
       .add(id, "9", "a_s", "hello0", "a_i", "14", "a_f", "10")
       .commit(cluster.getSolrClient(), COLLECTIONORALIAS);
+  StreamContext streamContext = new StreamContext();
+  SolrClientCache solrClientCache = new SolrClientCache();
+  streamContext.setSolrClientCache(solrClientCache);
+  try {
 
-  SolrParams sParamsA = StreamingTest.mapParams("q", "*:*", "fl", "id,a_s,a_i,a_f", "sort", "a_s asc,a_f asc", "partitionKeys", "none");
-  CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParamsA);
-  ParallelStream pstream = parallelStream(stream, new FieldComparator("a_s", ComparatorOrder.ASCENDING));
-  attachStreamFactory(pstream);
-  List<Tuple> tuples = getTuples(pstream);
-
-  assert(tuples.size() == (10 * numWorkers)); // Each tuple will be double counted.
+    SolrParams sParamsA = StreamingTest.mapParams("q", "*:*", "fl", "id,a_s,a_i,a_f", "sort", "a_s asc,a_f asc", "partitionKeys", "none");
+    CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParamsA);
+    ParallelStream pstream = parallelStream(stream, new FieldComparator("a_s", ComparatorOrder.ASCENDING));
+    attachStreamFactory(pstream);
+    pstream.setStreamContext(streamContext);
+    List<Tuple> tuples = getTuples(pstream);
 
+    assert (tuples.size() == (10 * numWorkers)); // Each tuple will be double counted.
+  } finally {
+    solrClientCache.close();
+  }
 }
 
 @Test
@@ -193,19 +207,29 @@ public void testParallelUniqueStream() throws Exception {
       .add(id, "8", "a_s", "hello1", "a_i", "13", "a_f", "4")
       .commit(cluster.getSolrClient(), COLLECTIONORALIAS);
 
-  SolrParams sParams = mapParams("q", "*:*", "fl", "id,a_s,a_i,a_f", "sort", "a_f asc,a_i asc", "partitionKeys", "a_f");
-  CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParams);
-  UniqueStream ustream = new UniqueStream(stream, new FieldEqualitor("a_f"));
-  ParallelStream pstream = parallelStream(ustream, new FieldComparator("a_f", ComparatorOrder.ASCENDING));
-  attachStreamFactory(pstream);
-  List<Tuple> tuples = getTuples(pstream);
-  assertEquals(5, tuples.size());
-  assertOrder(tuples, 0, 1, 3, 4, 6);
+  StreamContext streamContext = new StreamContext();
+  SolrClientCache solrClientCache = new SolrClientCache();
+  streamContext.setSolrClientCache(solrClientCache);
+
+  try {
 
-  //Test the eofTuples
+    SolrParams sParams = mapParams("q", "*:*", "fl", "id,a_s,a_i,a_f", "sort", "a_f asc,a_i asc", "partitionKeys", "a_f");
+    CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParams);
+    UniqueStream ustream = new UniqueStream(stream, new FieldEqualitor("a_f"));
+    ParallelStream pstream = parallelStream(ustream, new FieldComparator("a_f", ComparatorOrder.ASCENDING));
+    attachStreamFactory(pstream);
+    pstream.setStreamContext(streamContext);
+    List<Tuple> tuples = getTuples(pstream);
+    assertEquals(5, tuples.size());
+    assertOrder(tuples, 0, 1, 3, 4, 6);
 
-  Map<String,Tuple> eofTuples = pstream.getEofTuples();
-  assertEquals(numWorkers, eofTuples.size()); //There should be an EOF tuple for each worker.
+    //Test the eofTuples
+
+    Map<String, Tuple> eofTuples = pstream.getEofTuples();
+    assertEquals(numWorkers, eofTuples.size()); //There should be an EOF tuple for each worker.
+  }finally {
+    solrClientCache.close();
+  }
 
 }
 
@@ -226,12 +250,21 @@ public void testMultipleFqClauses() throws Exception {
 
   streamFactory.withCollectionZkHost(COLLECTIONORALIAS, zkHost);
 
-  ModifiableSolrParams params = new ModifiableSolrParams(mapParams("q", "*:*", "fl", "id,a_i", 
-      "sort", "a_i asc", "fq", "a_ss:hello0", "fq", "a_ss:hello1"));
-  CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTIONORALIAS, params);
-  List<Tuple> tuples = getTuples(stream);
-  assertEquals("Multiple fq clauses should have been honored", 1, tuples.size());
-  assertEquals("should only have gotten back document 0", "0", tuples.get(0).getString("id"));
+  StreamContext streamContext = new StreamContext();
+  SolrClientCache solrClientCache = new SolrClientCache();
+  streamContext.setSolrClientCache(solrClientCache);
+
+  try {
+    ModifiableSolrParams params = new ModifiableSolrParams(mapParams("q", "*:*", "fl", "id,a_i",
+        "sort", "a_i asc", "fq", "a_ss:hello0", "fq", "a_ss:hello1"));
+    CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTIONORALIAS, params);
+    stream.setStreamContext(streamContext);
+    List<Tuple> tuples = getTuples(stream);
+    assertEquals("Multiple fq clauses should have been honored", 1, tuples.size());
+    assertEquals("should only have gotten back document 0", "0", tuples.get(0).getString("id"));
+  } finally {
+    solrClientCache.close();
+  }
 }
 
 @Test
@@ -245,15 +278,20 @@ public void testRankStream() throws Exception {
       .add(id, "1", "a_s", "hello1", "a_i", "1", "a_f", "1")
       .commit(cluster.getSolrClient(), COLLECTIONORALIAS);
 
-
-  SolrParams sParams = mapParams("q", "*:*", "fl", "id,a_s,a_i", "sort", "a_i asc");
-  CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParams);
-  RankStream rstream = new RankStream(stream, 3, new FieldComparator("a_i",ComparatorOrder.DESCENDING));
-  List<Tuple> tuples = getTuples(rstream);
-
-  assertEquals(3, tuples.size());
-  assertOrder(tuples, 4,3,2);
-
+  StreamContext streamContext = new StreamContext();
+  SolrClientCache solrClientCache = new SolrClientCache();
+  streamContext.setSolrClientCache(solrClientCache);
+  try {
+    SolrParams sParams = mapParams("q", "*:*", "fl", "id,a_s,a_i", "sort", "a_i asc");
+    CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParams);
+    RankStream rstream = new RankStream(stream, 3, new FieldComparator("a_i", ComparatorOrder.DESCENDING));
+    rstream.setStreamContext(streamContext);
+    List<Tuple> tuples = getTuples(rstream);
+    assertEquals(3, tuples.size());
+    assertOrder(tuples, 4, 3, 2);
+  } finally {
+    solrClientCache.close();
+  }
 }
 
 @Test
@@ -272,22 +310,30 @@ public void testParallelRankStream() throws Exception {
       .add(id, "10", "a_s", "hello1", "a_i", "10", "a_f", "1")
       .commit(cluster.getSolrClient(), COLLECTIONORALIAS);
 
-  SolrParams sParams = mapParams("q", "*:*", "fl", "id,a_s,a_i", "sort", "a_i asc", "partitionKeys", "a_i");
-  CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParams);
-  RankStream rstream = new RankStream(stream, 11, new FieldComparator("a_i",ComparatorOrder.DESCENDING));
-  ParallelStream pstream = parallelStream(rstream, new FieldComparator("a_i", ComparatorOrder.DESCENDING));    
-  attachStreamFactory(pstream);
-  List<Tuple> tuples = getTuples(pstream);
+  StreamContext streamContext = new StreamContext();
+  SolrClientCache solrClientCache = new SolrClientCache();
+  streamContext.setSolrClientCache(solrClientCache);
+  try {
+    SolrParams sParams = mapParams("q", "*:*", "fl", "id,a_s,a_i", "sort", "a_i asc", "partitionKeys", "a_i");
+    CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParams);
+    RankStream rstream = new RankStream(stream, 11, new FieldComparator("a_i", ComparatorOrder.DESCENDING));
+    ParallelStream pstream = parallelStream(rstream, new FieldComparator("a_i", ComparatorOrder.DESCENDING));
+    attachStreamFactory(pstream);
+    pstream.setStreamContext(streamContext);
+    List<Tuple> tuples = getTuples(pstream);
 
-  assertEquals(10, tuples.size());
-  assertOrder(tuples, 10,9,8,7,6,5,4,3,2,0);
+    assertEquals(10, tuples.size());
+    assertOrder(tuples, 10, 9, 8, 7, 6, 5, 4, 3, 2, 0);
+  } finally {
+    solrClientCache.close();
+  }
 
 }
 
 @Test
-public void testTrace() throws Exception {
+  public void testTrace() throws Exception {
 
-  new UpdateRequest()
+    new UpdateRequest()
       .add(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "1")
       .add(id, "2", "a_s", "hello0", "a_i", "2", "a_f", "2")
       .add(id, "3", "a_s", "hello3", "a_i", "3", "a_f", "3")
@@ -300,15 +346,24 @@ public void testTrace() throws Exception {
       .add(id, "9", "a_s", "hello0", "a_i", "14", "a_f", "10")
       .commit(cluster.getSolrClient(), COLLECTIONORALIAS);
 
-  //Test with spaces in the parameter lists.
-  SolrParams sParamsA = mapParams("q", "*:*", "fl", "id,a_s, a_i,a_f", "sort", "a_s asc,a_f   asc");
-  CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParamsA);
-  stream.setTrace(true);
-  List<Tuple> tuples = getTuples(stream);
-    assertEquals(COLLECTIONORALIAS, tuples.get(0).get("_COLLECTION_"));
-    assertEquals(COLLECTIONORALIAS, tuples.get(1).get("_COLLECTION_"));
-    assertEquals(COLLECTIONORALIAS, tuples.get(2).get("_COLLECTION_"));
-    assertEquals(COLLECTIONORALIAS, tuples.get(3).get("_COLLECTION_"));
+    StreamContext streamContext = new StreamContext();
+    SolrClientCache solrClientCache = new SolrClientCache();
+    streamContext.setSolrClientCache(solrClientCache);
+
+    try {
+      //Test with spaces in the parameter lists.
+      SolrParams sParamsA = mapParams("q", "*:*", "fl", "id,a_s, a_i,a_f", "sort", "a_s asc,a_f   asc");
+      CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParamsA);
+      stream.setTrace(true);
+      stream.setStreamContext(streamContext);
+      List<Tuple> tuples = getTuples(stream);
+      assertEquals(COLLECTIONORALIAS, tuples.get(0).get("_COLLECTION_"));
+      assertEquals(COLLECTIONORALIAS, tuples.get(1).get("_COLLECTION_"));
+      assertEquals(COLLECTIONORALIAS, tuples.get(2).get("_COLLECTION_"));
+      assertEquals(COLLECTIONORALIAS, tuples.get(3).get("_COLLECTION_"));
+    } finally {
+      solrClientCache.close();
+    }
   }
 
   @Test
@@ -327,52 +382,60 @@ public void testTrace() throws Exception {
         .add(id, "9", "a_s", "hello0", "a_i", "14", "a_f", "10")
         .commit(cluster.getSolrClient(), COLLECTIONORALIAS);
 
-    //Test with spaces in the parameter lists.
-    SolrParams sParamsA = mapParams("q", "*:*", "fl", "id,a_s, a_i,  a_f", "sort", "a_s asc  ,  a_f   asc");
-    CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParamsA);
-    ReducerStream rstream  = new ReducerStream(stream,
-                                               new FieldEqualitor("a_s"),
-                                               new GroupOperation(new FieldComparator("a_f", ComparatorOrder.ASCENDING), 5));
-
-    List<Tuple> tuples = getTuples(rstream);
-
-    assertEquals(3, tuples.size());
-
-    Tuple t0 = tuples.get(0);
-    List<Map> maps0 = t0.getMaps("group");
-    assertMaps(maps0, 0, 2, 1, 9);
-
-    Tuple t1 = tuples.get(1);
-    List<Map> maps1 = t1.getMaps("group");
-    assertMaps(maps1, 3, 5, 7, 8);
-
-    Tuple t2 = tuples.get(2);
-    List<Map> maps2 = t2.getMaps("group");
-    assertMaps(maps2, 4, 6);
-
-    //Test with spaces in the parameter lists using a comparator
-    sParamsA = mapParams("q", "*:*", "fl", "id,a_s, a_i,  a_f", "sort", "a_s asc  ,  a_f   asc");
-    stream = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParamsA);
-    rstream = new ReducerStream(stream,
-                                new FieldComparator("a_s", ComparatorOrder.ASCENDING),
-                                new GroupOperation(new FieldComparator("a_f", ComparatorOrder.DESCENDING), 5));
-
-    tuples = getTuples(rstream);
-
-    assertEquals(3, tuples.size());
-
-    t0 = tuples.get(0);
-    maps0 = t0.getMaps("group");
-    assertMaps(maps0, 9, 1, 2, 0);
-
-    t1 = tuples.get(1);
-    maps1 = t1.getMaps("group");
-    assertMaps(maps1, 8, 7, 5, 3);
-
-    t2 = tuples.get(2);
-    maps2 = t2.getMaps("group");
-    assertMaps(maps2, 6, 4);
-
+    StreamContext streamContext = new StreamContext();
+    SolrClientCache solrClientCache = new SolrClientCache();
+    streamContext.setSolrClientCache(solrClientCache);
+
+    try {
+      //Test with spaces in the parameter lists.
+      SolrParams sParamsA = mapParams("q", "*:*", "fl", "id,a_s, a_i,  a_f", "sort", "a_s asc  ,  a_f   asc");
+      CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParamsA);
+      ReducerStream rstream = new ReducerStream(stream,
+          new FieldEqualitor("a_s"),
+          new GroupOperation(new FieldComparator("a_f", ComparatorOrder.ASCENDING), 5));
+
+      rstream.setStreamContext(streamContext);
+      List<Tuple> tuples = getTuples(rstream);
+
+      assertEquals(3, tuples.size());
+
+      Tuple t0 = tuples.get(0);
+      List<Map> maps0 = t0.getMaps("group");
+      assertMaps(maps0, 0, 2, 1, 9);
+
+      Tuple t1 = tuples.get(1);
+      List<Map> maps1 = t1.getMaps("group");
+      assertMaps(maps1, 3, 5, 7, 8);
+
+      Tuple t2 = tuples.get(2);
+      List<Map> maps2 = t2.getMaps("group");
+      assertMaps(maps2, 4, 6);
+
+      //Test with spaces in the parameter lists using a comparator
+      sParamsA = mapParams("q", "*:*", "fl", "id,a_s, a_i,  a_f", "sort", "a_s asc  ,  a_f   asc");
+      stream = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParamsA);
+      rstream = new ReducerStream(stream,
+          new FieldComparator("a_s", ComparatorOrder.ASCENDING),
+          new GroupOperation(new FieldComparator("a_f", ComparatorOrder.DESCENDING), 5));
+      rstream.setStreamContext(streamContext);
+      tuples = getTuples(rstream);
+
+      assertEquals(3, tuples.size());
+
+      t0 = tuples.get(0);
+      maps0 = t0.getMaps("group");
+      assertMaps(maps0, 9, 1, 2, 0);
+
+      t1 = tuples.get(1);
+      maps1 = t1.getMaps("group");
+      assertMaps(maps1, 8, 7, 5, 3);
+
+      t2 = tuples.get(2);
+      maps2 = t2.getMaps("group");
+      assertMaps(maps2, 6, 4);
+    } finally {
+      solrClientCache.close();
+    }
   }
 
   @Test
@@ -392,17 +455,24 @@ public void testTrace() throws Exception {
         .add(id, "9", "a_s", "hello0", "a_i", "14", "a_f", "10")
         .commit(cluster.getSolrClient(), COLLECTIONORALIAS);
 
-    //Test with spaces in the parameter lists.
-    SolrParams sParamsA = mapParams("q", "blah", "fl", "id,a_s, a_i,  a_f", "sort", "a_s asc  ,  a_f   asc");
-    CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParamsA);
-    ReducerStream rstream = new ReducerStream(stream,
-                                              new FieldEqualitor("a_s"),
-                                              new GroupOperation(new FieldComparator("a_f", ComparatorOrder.ASCENDING), 5));
-
-    List<Tuple> tuples = getTuples(rstream);
-
-    assertEquals(0, tuples.size());
-
+    StreamContext streamContext = new StreamContext();
+    SolrClientCache solrClientCache = new SolrClientCache();
+    streamContext.setSolrClientCache(solrClientCache);
+
+    try {
+      //Test with spaces in the parameter lists.
+      SolrParams sParamsA = mapParams("q", "blah", "fl", "id,a_s, a_i,  a_f", "sort", "a_s asc  ,  a_f   asc");
+      CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParamsA);
+      ReducerStream rstream = new ReducerStream(stream,
+          new FieldEqualitor("a_s"),
+          new GroupOperation(new FieldComparator("a_f", ComparatorOrder.ASCENDING), 5));
+      rstream.setStreamContext(streamContext);
+      List<Tuple> tuples = getTuples(rstream);
+
+      assertEquals(0, tuples.size());
+    } finally {
+      solrClientCache.close();
+    }
   }
 
   @Test
@@ -421,56 +491,65 @@ public void testTrace() throws Exception {
         .add(id, "9", "a_s", "hello0", "a_i", "14", "a_f", "10")
         .commit(cluster.getSolrClient(), COLLECTIONORALIAS);
 
-    SolrParams sParamsA = mapParams("q", "*:*", "fl", "id,a_s,a_i,a_f", "sort", "a_s asc,a_f asc", "partitionKeys", "a_s");
-    CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParamsA);
+    StreamContext streamContext = new StreamContext();
+    SolrClientCache solrClientCache = new SolrClientCache();
+    streamContext.setSolrClientCache(solrClientCache);
 
-    ReducerStream rstream = new ReducerStream(stream,
-                                              new FieldEqualitor("a_s"),
-                                              new GroupOperation(new FieldComparator("a_f", ComparatorOrder.DESCENDING), 5));
-    ParallelStream pstream = parallelStream(rstream, new FieldComparator("a_s", ComparatorOrder.ASCENDING));    
-    attachStreamFactory(pstream);
-    List<Tuple> tuples = getTuples(pstream);
+    try {
+      SolrParams sParamsA = mapParams("q", "*:*", "fl", "id,a_s,a_i,a_f", "sort", "a_s asc,a_f asc", "partitionKeys", "a_s");
+      CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParamsA);
 
-    assertEquals(3, tuples.size());
+      ReducerStream rstream = new ReducerStream(stream,
+          new FieldEqualitor("a_s"),
+          new GroupOperation(new FieldComparator("a_f", ComparatorOrder.DESCENDING), 5));
+      ParallelStream pstream = parallelStream(rstream, new FieldComparator("a_s", ComparatorOrder.ASCENDING));
+      attachStreamFactory(pstream);
+      pstream.setStreamContext(streamContext);
+      List<Tuple> tuples = getTuples(pstream);
 
-    Tuple t0 = tuples.get(0);
-    List<Map> maps0 = t0.getMaps("group");
-    assertMaps(maps0, 9, 1, 2, 0);
+      assertEquals(3, tuples.size());
 
-    Tuple t1 = tuples.get(1);
-    List<Map> maps1 = t1.getMaps("group");
-    assertMaps(maps1, 8, 7, 5, 3);
+      Tuple t0 = tuples.get(0);
+      List<Map> maps0 = t0.getMaps("group");
+      assertMaps(maps0, 9, 1, 2, 0);
 
-    Tuple t2 = tuples.get(2);
-    List<Map> maps2 = t2.getMaps("group");
-    assertMaps(maps2, 6, 4);
+      Tuple t1 = tuples.get(1);
+      List<Map> maps1 = t1.getMaps("group");
+      assertMaps(maps1, 8, 7, 5, 3);
 
-    //Test Descending with Ascending subsort
+      Tuple t2 = tuples.get(2);
+      List<Map> maps2 = t2.getMaps("group");
+      assertMaps(maps2, 6, 4);
 
-    sParamsA = mapParams("q", "*:*", "fl", "id,a_s,a_i,a_f", "sort", "a_s desc,a_f asc", "partitionKeys", "a_s");
-    stream = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParamsA);
+      //Test Descending with Ascending subsort
 
-    rstream = new ReducerStream(stream,
-                                new FieldEqualitor("a_s"),
-                                new GroupOperation(new FieldComparator("a_f", ComparatorOrder.ASCENDING), 3));
-    pstream = parallelStream(rstream, new FieldComparator("a_s", ComparatorOrder.DESCENDING));
-    attachStreamFactory(pstream);
-    tuples = getTuples(pstream);
+      sParamsA = mapParams("q", "*:*", "fl", "id,a_s,a_i,a_f", "sort", "a_s desc,a_f asc", "partitionKeys", "a_s");
+      stream = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParamsA);
 
-    assertEquals(3, tuples.size());
+      rstream = new ReducerStream(stream,
+          new FieldEqualitor("a_s"),
+          new GroupOperation(new FieldComparator("a_f", ComparatorOrder.ASCENDING), 3));
+      pstream = parallelStream(rstream, new FieldComparator("a_s", ComparatorOrder.DESCENDING));
+      attachStreamFactory(pstream);
+      pstream.setStreamContext(streamContext);
+      tuples = getTuples(pstream);
 
-    t0 = tuples.get(0);
-    maps0 = t0.getMaps("group");
-    assertMaps(maps0, 4, 6);
+      assertEquals(3, tuples.size());
 
-    t1 = tuples.get(1);
-    maps1 = t1.getMaps("group");
-    assertMaps(maps1, 3, 5, 7);
+      t0 = tuples.get(0);
+      maps0 = t0.getMaps("group");
+      assertMaps(maps0, 4, 6);
 
-    t2 = tuples.get(2);
-    maps2 = t2.getMaps("group");
-    assertMaps(maps2, 0, 2, 1);
+      t1 = tuples.get(1);
+      maps1 = t1.getMaps("group");
+      assertMaps(maps1, 3, 5, 7);
 
+      t2 = tuples.get(2);
+      maps2 = t2.getMaps("group");
+      assertMaps(maps2, 0, 2, 1);
+    } finally {
+      solrClientCache.close();
+    }
   }
 
   @Test
@@ -490,24 +569,33 @@ public void testTrace() throws Exception {
         .add(id, "9", "a_s", "hello0", "a_i", "14", "a_f", "10")
         .commit(cluster.getSolrClient(), COLLECTIONORALIAS);
 
+    StreamContext streamContext = new StreamContext();
+    SolrClientCache solrClientCache = new SolrClientCache();
+    streamContext.setSolrClientCache(solrClientCache);
     //Test an error that comes originates from the /select handler
-    SolrParams sParamsA = mapParams("q", "*:*", "fl", "a_s,a_i,a_f,blah", "sort", "blah asc");
-    CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParamsA);
-    ExceptionStream estream = new ExceptionStream(stream);
-    Tuple t = getTuple(estream);
-    assertTrue(t.EOF);
-    assertTrue(t.EXCEPTION);
-    assertTrue(t.getException().contains("sort param field can't be found: blah"));
-
-    //Test an error that comes originates from the /export handler
-    sParamsA = mapParams("q", "*:*", "fl", "a_s,a_i,a_f,score", "sort", "a_s asc", "qt", "/export");
-    stream = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParamsA);
-    estream = new ExceptionStream(stream);
-    t = getTuple(estream);
-    assertTrue(t.EOF);
-    assertTrue(t.EXCEPTION);
-    //The /export handler will pass through a real exception.
-    assertTrue(t.getException().contains("undefined field:"));
+    try {
+      SolrParams sParamsA = mapParams("q", "*:*", "fl", "a_s,a_i,a_f,blah", "sort", "blah asc");
+      CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParamsA);
+      ExceptionStream estream = new ExceptionStream(stream);
+      estream.setStreamContext(streamContext);
+      Tuple t = getTuple(estream);
+      assertTrue(t.EOF);
+      assertTrue(t.EXCEPTION);
+      assertTrue(t.getException().contains("sort param field can't be found: blah"));
+
+      //Test an error that comes originates from the /export handler
+      sParamsA = mapParams("q", "*:*", "fl", "a_s,a_i,a_f,score", "sort", "a_s asc", "qt", "/export");
+      stream = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParamsA);
+      estream = new ExceptionStream(stream);
+      estream.setStreamContext(streamContext);
+      t = getTuple(estream);
+      assertTrue(t.EOF);
+      assertTrue(t.EXCEPTION);
+      //The /export handler will pass through a real exception.
+      assertTrue(t.getException().contains("undefined field:"));
+    } finally {
+      solrClientCache.close();
+    }
   }
 
   @Test
@@ -577,48 +665,55 @@ public void testTrace() throws Exception {
         .add(id, "9", "a_s", "hello0", "a_i", "14", "a_f", "10")
         .commit(cluster.getSolrClient(), COLLECTIONORALIAS);
 
-    SolrParams sParamsA = mapParams("q", "*:*");
-
-    Metric[] metrics = {new SumMetric("a_i"),
-                        new SumMetric("a_f"),
-                        new MinMetric("a_i"),
-                        new MinMetric("a_f"),
-                        new MaxMetric("a_i"),
-                        new MaxMetric("a_f"),
-                        new MeanMetric("a_i"),
-                        new MeanMetric("a_f"),
-                        new CountMetric()};
-
-    StatsStream statsStream = new StatsStream(zkHost, COLLECTIONORALIAS, sParamsA, metrics);
-
-    List<Tuple> tuples = getTuples(statsStream);
-
-    assertEquals(1, tuples.size());
-
-    //Test Long and Double Sums
-
-    Tuple tuple = tuples.get(0);
-
-    Double sumi = tuple.getDouble("sum(a_i)");
-    Double sumf = tuple.getDouble("sum(a_f)");
-    Double mini = tuple.getDouble("min(a_i)");
-    Double minf = tuple.getDouble("min(a_f)");
-    Double maxi = tuple.getDouble("max(a_i)");
-    Double maxf = tuple.getDouble("max(a_f)");
-    Double avgi = tuple.getDouble("avg(a_i)");
-    Double avgf = tuple.getDouble("avg(a_f)");
-    Double count = tuple.getDouble("count(*)");
-
-    assertEquals(70, sumi.longValue());
-    assertEquals(55.0, sumf.doubleValue(), 0.01);
-    assertEquals(0.0, mini.doubleValue(), 0.01);
-    assertEquals(1.0, minf.doubleValue(), 0.01);
-    assertEquals(14.0, maxi.doubleValue(), 0.01);
-    assertEquals(10.0, maxf.doubleValue(), 0.01);
-    assertEquals(7.0, avgi.doubleValue(), .01);
-    assertEquals(5.5, avgf.doubleValue(), .001);
-    assertEquals(10, count.doubleValue(), .01);
-
+    StreamContext streamContext = new StreamContext();
+    SolrClientCache solrClientCache = new SolrClientCache();
+    streamContext.setSolrClientCache(solrClientCache);
+
+    try {
+      SolrParams sParamsA = mapParams("q", "*:*");
+
+      Metric[] metrics = {new SumMetric("a_i"),
+          new SumMetric("a_f"),
+          new MinMetric("a_i"),
+          new MinMetric("a_f"),
+          new MaxMetric("a_i"),
+          new MaxMetric("a_f"),
+          new MeanMetric("a_i"),
+          new MeanMetric("a_f"),
+          new CountMetric()};
+
+      StatsStream statsStream = new StatsStream(zkHost, COLLECTIONORALIAS, sParamsA, metrics);
+      statsStream.setStreamContext(streamContext);
+      List<Tuple> tuples = getTuples(statsStream);
+
+      assertEquals(1, tuples.size());
+
+      //Test Long and Double Sums
+
+      Tuple tuple = tuples.get(0);
+
+      Double sumi = tuple.getDouble("sum(a_i)");
+      Double sumf = tuple.getDouble("sum(a_f)");
+      Double mini = tuple.getDouble("min(a_i)");
+      Double minf = tuple.getDouble("min(a_f)");
+      Double maxi = tuple.getDouble("max(a_i)");
+      Double maxf = tuple.getDouble("max(a_f)");
+      Double avgi = tuple.getDouble("avg(a_i)");
+      Double avgf = tuple.getDouble("avg(a_f)");
+      Double count = tuple.getDouble("count(*)");
+
+      assertEquals(70, sumi.longValue());
+      assertEquals(55.0, sumf.doubleValue(), 0.01);
+      assertEquals(0.0, mini.doubleValue(), 0.01);
+      assertEquals(1.0, minf.doubleValue(), 0.01);
+      assertEquals(14.0, maxi.doubleValue(), 0.01);
+      assertEquals(10.0, maxf.doubleValue(), 0.01);
+      assertEquals(7.0, avgi.doubleValue(), .01);
+      assertEquals(5.5, avgf.doubleValue(), .001);
+      assertEquals(10, count.doubleValue(), .01);
+    } finally {
+      solrClientCache.close();
+    }
   }
 
   @Test
@@ -637,344 +732,352 @@ public void testTrace() throws Exception {
         .add(id, "9", "a_s", "hello0", "a_i", "14", "a_f", "10")
         .commit(cluster.getSolrClient(), COLLECTIONORALIAS);
 
-    SolrParams sParamsA = mapParams("q", "*:*", "fl", "a_s,a_i,a_f", "sort", "a_s asc");
-
-    Bucket[] buckets =  {new Bucket("a_s")};
-
-    Metric[] metrics = {new SumMetric("a_i"),
-                        new SumMetric("a_f"),
-                        new MinMetric("a_i"),
-                        new MinMetric("a_f"),
-                        new MaxMetric("a_i"),
-                        new MaxMetric("a_f"),
-                        new MeanMetric("a_i"),
-                        new MeanMetric("a_f"),
-                        new CountMetric()};
-
-    FieldComparator[] sorts = {new FieldComparator("sum(a_i)",
-                                                   ComparatorOrder.ASCENDING)};
-
-    FacetStream facetStream = new FacetStream(zkHost, COLLECTIONORALIAS, sParamsA, buckets, metrics, sorts, 100);
-
-    List<Tuple> tuples = getTuples(facetStream);
-
-    assert(tuples.size() == 3);
-
-    //Test Long and Double Sums
-
-    Tuple tuple = tuples.get(0);
-    String bucket = tuple.getString("a_s");
-    Double sumi = tuple.getDouble("sum(a_i)");
-    Double sumf = tuple.getDouble("sum(a_f)");
-    Double mini = tuple.getDouble("min(a_i)");
-    Double minf = tuple.getDouble("min(a_f)");
-    Double maxi = tuple.getDouble("max(a_i)");
-    Double maxf = tuple.getDouble("max(a_f)");
-    Double avgi = tuple.getDouble("avg(a_i)");
-    Double avgf = tuple.getDouble("avg(a_f)");
-    Double count = tuple.getDouble("count(*)");
-
-    assertEquals("hello4", bucket);
-    assertEquals(15, sumi.longValue());
-    assertEquals(11.0, sumf.doubleValue(), 0.01);
-    assertEquals(4.0, mini.doubleValue(), 0.01);
-    assertEquals(4.0, minf.doubleValue(), 0.01);
-    assertEquals(11.0, maxi.doubleValue(), 0.01);
-    assertEquals(7.0, maxf.doubleValue(), 0.01);
-    assertEquals(7.5, avgi.doubleValue(), 0.01);
-    assertEquals(5.5, avgf.doubleValue(), 0.01);
-    assertEquals(2, count.doubleValue(), 0.01);
-
-    tuple = tuples.get(1);
-    bucket = tuple.getString("a_s");
-    sumi = tuple.getDouble("sum(a_i)");
-    sumf = tuple.getDouble("sum(a_f)");
-    mini = tuple.getDouble("min(a_i)");
-    minf = tuple.getDouble("min(a_f)");
-    maxi = tuple.getDouble("max(a_i)");
-    maxf = tuple.getDouble("max(a_f)");
-    avgi = tuple.getDouble("avg(a_i)");
-    avgf = tuple.getDouble("avg(a_f)");
-    count = tuple.getDouble("count(*)");
-
-    assertEquals("hello0", bucket);
-    assertEquals(17, sumi.doubleValue(), .01);
-    assertEquals(18, sumf.doubleValue(), .01);
-    assertEquals(0.0, mini.doubleValue(), .01);
-    assertEquals(1.0, minf.doubleValue(), .01);
-    assertEquals(14.0, maxi.doubleValue(), .01);
-    assertEquals(10.0, maxf.doubleValue(), .01);
-    assertEquals(4.25, avgi.doubleValue(), .01);
-    assertEquals(4.5, avgf.doubleValue(), .01);
-    assertEquals(4, count.doubleValue(), .01);
-
-    tuple = tuples.get(2);
-    bucket = tuple.getString("a_s");
-    sumi = tuple.getDouble("sum(a_i)");
-    sumf = tuple.getDouble("sum(a_f)");
-    mini = tuple.getDouble("min(a_i)");
-    minf = tuple.getDouble("min(a_f)");
-    maxi = tuple.getDouble("max(a_i)");
-    maxf = tuple.getDouble("max(a_f)");
-    avgi = tuple.getDouble("avg(a_i)");
-    avgf = tuple.getDouble("avg(a_f)");
-    count = tuple.getDouble("count(*)");
-
-    assertEquals("hello3", bucket);
-    assertEquals(38.0, sumi.doubleValue(), 0.01);
-    assertEquals(26.0, sumf.doubleValue(), 0.01);
-    assertEquals(3.0, mini.doubleValue(), 0.01);
-    assertEquals(3.0, minf.doubleValue(), 0.01);
-    assertEquals(13.0, maxi.doubleValue(), 0.01);
-    assertEquals(9.0, maxf.doubleValue(), 0.01);
-    assertEquals(9.5, avgi.doubleValue(), 0.01);
-    assertEquals(6.5, avgf.doubleValue(), 0.01);
-    assertEquals(4, count.doubleValue(), 0.01);
-
-
-    //Reverse the Sort.
-
-    sorts[0] = new FieldComparator("sum(a_i)", ComparatorOrder.DESCENDING);
-
-    facetStream = new FacetStream(zkHost, COLLECTIONORALIAS, sParamsA, buckets, metrics, sorts, 100);
-
-    tuples = getTuples(facetStream);
-
-    assertEquals(3, tuples.size());
-
-    //Test Long and Double Sums
-
-    tuple = tuples.get(0);
-    bucket = tuple.getString("a_s");
-    sumi = tuple.getDouble("sum(a_i)");
-    sumf = tuple.getDouble("sum(a_f)");
-    mini = tuple.getDouble("min(a_i)");
-    minf = tuple.getDouble("min(a_f)");
-    maxi = tuple.getDouble("max(a_i)");
-    maxf = tuple.getDouble("max(a_f)");
-    avgi = tuple.getDouble("avg(a_i)");
-    avgf = tuple.getDouble("avg(a_f)");
-    count = tuple.getDouble("count(*)");
-
-    assertEquals("hello3", bucket);
-    assertEquals(38, sumi.doubleValue(), 0.1);
-    assertEquals(26, sumf.doubleValue(), 0.1);
-    assertEquals(3, mini.doubleValue(), 0.1);
-    assertEquals(3, minf.doubleValue(), 0.1);
-    assertEquals(13, maxi.doubleValue(), 0.1);
-    assertEquals(9, maxf.doubleValue(), 0.1);
-    assertEquals(9.5, avgi.doubleValue(), 0.1);
-    assertEquals(6.5, avgf.doubleValue(), 0.1);
-    assertEquals(4, count.doubleValue(), 0.1);
-
-    tuple = tuples.get(1);
-    bucket = tuple.getString("a_s");
-    sumi = tuple.getDouble("sum(a_i)");
-    sumf = tuple.getDouble("sum(a_f)");
-    mini = tuple.getDouble("min(a_i)");
-    minf = tuple.getDouble("min(a_f)");
-    maxi = tuple.getDouble("max(a_i)");
-    maxf = tuple.getDouble("max(a_f)");
-    avgi = tuple.getDouble("avg(a_i)");
-    avgf = tuple.getDouble("avg(a_f)");
-    count = tuple.getDouble("count(*)");
-
-    assertEquals("hello0", bucket);
-    assertEquals(17, sumi.doubleValue(), 0.01);
-    assertEquals(18, sumf.doubleValue(), 0.01);
-    assertEquals(0, mini.doubleValue(), 0.01);
-    assertEquals(1, minf.doubleValue(), 0.01);
-    assertEquals(14, maxi.doubleValue(), 0.01);
-    assertEquals(10, maxf.doubleValue(), 0.01);
-    assertEquals(4.25, avgi.doubleValue(), 0.01);
-    assertEquals(4.5, avgf.doubleValue(), 0.01);
-    assertEquals(4, count.doubleValue(), 0.01);
-
-    tuple = tuples.get(2);
-    bucket = tuple.getString("a_s");
-    sumi = tuple.getDouble("sum(a_i)");
-    sumf = tuple.getDouble("sum(a_f)");
-    mini = tuple.getDouble("min(a_i)");
-    minf = tuple.getDouble("min(a_f)");
-    maxi = tuple.getDouble("max(a_i)");
-    maxf = tuple.getDouble("max(a_f)");
-    avgi = tuple.getDouble("avg(a_i)");
-    avgf = tuple.getDouble("avg(a_f)");
-    count = tuple.getDouble("count(*)");
-
-    assertEquals("hello4", bucket);
-    assertEquals(15, sumi.longValue());
-    assertEquals(11, sumf.doubleValue(), 0.01);
-    assertEquals(4.0, mini.doubleValue(), 0.01);
-    assertEquals(4.0, minf.doubleValue(), 0.01);
-    assertEquals(11.0, maxi.doubleValue(), 0.01);
-    assertEquals(7.0, maxf.doubleValue(), 0.01);
-    assertEquals(7.5, avgi.doubleValue(), 0.01);
-    assertEquals(5.5, avgf.doubleValue(), 0.01);
-    assertEquals(2, count.doubleValue(), 0.01);
-
-
-    //Test index sort
-
-    sorts[0] = new FieldComparator("a_s", ComparatorOrder.DESCENDING);
-
-
-    facetStream = new FacetStream(zkHost, COLLECTIONORALIAS, sParamsA, buckets, metrics, sorts, 100);
-
-    tuples = getTuples(facetStream);
-
-    assertEquals(3, tuples.size());
-
-
-    tuple = tuples.get(0);
-    bucket = tuple.getString("a_s");
-    sumi = tuple.getDouble("sum(a_i)");
-    sumf = tuple.getDouble("sum(a_f)");
-    mini = tuple.getDouble("min(a_i)");
-    minf = tuple.getDouble("min(a_f)");
-    maxi = tuple.getDouble("max(a_i)");
-    maxf = tuple.getDouble("max(a_f)");
-    avgi = tuple.getDouble("avg(a_i)");
-    avgf = tuple.getDouble("avg(a_f)");
-    count = tuple.getDouble("count(*)");
-
-
-    assertEquals("hello4", bucket);
-    assertEquals(15, sumi.longValue());
-    assertEquals(11, sumf.doubleValue(), 0.01);
-    assertEquals(4, mini.doubleValue(), 0.01);
-    assertEquals(4, minf.doubleValue(), 0.01);
-    assertEquals(11, maxi.doubleValue(), 0.01);
-    assertEquals(7, maxf.doubleValue(), 0.01);
-    assertEquals(7.5, avgi.doubleValue(), 0.01);
-    assertEquals(5.5, avgf.doubleValue(), 0.01);
-    assertEquals(2, count.doubleValue(), 0.01);
-
-    tuple = tuples.get(1);
-    bucket = tuple.getString("a_s");
-    sumi = tuple.getDouble("sum(a_i)");
-    sumf = tuple.getDouble("sum(a_f)");
-    mini = tuple.getDouble("min(a_i)");
-    minf = tuple.getDouble("min(a_f)");
-    maxi = tuple.getDouble("max(a_i)");
-    maxf = tuple.getDouble("max(a_f)");
-    avgi = tuple.getDouble("avg(a_i)");
-    avgf = tuple.getDouble("avg(a_f)");
-    count = tuple.getDouble("count(*)");
-
-    assertTrue(bucket.equals("hello3"));
-    assertTrue(sumi.doubleValue() == 38.0D);
-    assertTrue(sumf.doubleValue() == 26.0D);
-    assertTrue(mini.doubleValue() == 3.0D);
-    assertTrue(minf.doubleValue() == 3.0D);
-    assertTrue(maxi.doubleValue() == 13.0D);
-    assertTrue(maxf.doubleValue() == 9.0D);
-    assertTrue(avgi.doubleValue() == 9.5D);
-    assertTrue(avgf.doubleValue() == 6.5D);
-    assertTrue(count.doubleValue() == 4);
-
-    tuple = tuples.get(2);
-    bucket = tuple.getString("a_s");
-    sumi = tuple.getDouble("sum(a_i)");
-    sumf = tuple.getDouble("sum(a_f)");
-    mini = tuple.getDouble("min(a_i)");
-    minf = tuple.getDouble("min(a_f)");
-    maxi = tuple.getDouble("max(a_i)");
-    maxf = tuple.getDouble("max(a_f)");
-    avgi = tuple.getDouble("avg(a_i)");
-    avgf = tuple.getDouble("avg(a_f)");
-    count = tuple.getDouble("count(*)");
-
-    assertEquals("hello0", bucket);
-    assertEquals(17, sumi.doubleValue(), 0.01);
-    assertEquals(18, sumf.doubleValue(), 0.01);
-    assertEquals(0, mini.doubleValue(), 0.01);
-    assertEquals(1, minf.doubleValue(), 0.01);
-    assertEquals(14, maxi.doubleValue(), 0.01);
-    assertEquals(10, maxf.doubleValue(), 0.01);
-    assertEquals(4.25, avgi.doubleValue(), 0.01);
-    assertEquals(4.5, avgf.doubleValue(), 0.01);
-    assertEquals(4, count.doubleValue(), 0.01);
-
-    //Test index sort
-
-    sorts[0] = new FieldComparator("a_s", ComparatorOrder.ASCENDING);
-
-    facetStream = new FacetStream(zkHost, COLLECTIONORALIAS, sParamsA, buckets, metrics, sorts, 100);
-
-    tuples = getTuples(facetStream);
-
-    assertEquals(3, tuples.size());
-
-    tuple = tuples.get(0);
-    bucket = tuple.getString("a_s");
-    sumi = tuple.getDouble("sum(a_i)");
-    sumf = tuple.getDouble("sum(a_f)");
-    mini = tuple.getDouble("min(a_i)");
-    minf = tuple.getDouble("min(a_f)");
-    maxi = tuple.getDouble("max(a_i)");
-    maxf = tuple.getDouble("max(a_f)");
-    avgi = tuple.getDouble("avg(a_i)");
-    avgf = tuple.getDouble("avg(a_f)");
-    count = tuple.getDouble("count(*)");
-
-    assertEquals("hello0", bucket);
-    assertEquals(17, sumi.doubleValue(), 0.01);
-    assertEquals(18, sumf.doubleValue(), 0.01);
-    assertEquals(0, mini.doubleValue(), 0.01);
-    assertEquals(1, minf.doubleValue(), 0.01);
-    assertEquals(14, maxi.doubleValue(), 0.01);
-    assertEquals(10, maxf.doubleValue(), 0.01);
-    assertEquals(4.25, avgi.doubleValue(), 0.0001);
-    assertEquals(4.5, avgf.doubleValue(), 0.001);
-    assertEquals(4, count.doubleValue(), 0.01);
-
-    tuple = tuples.get(1);
-    bucket = tuple.getString("a_s");
-    sumi = tuple.getDouble("sum(a_i)");
-    sumf = tuple.getDouble("sum(a_f)");
-    mini = tuple.getDouble("min(a_i)");
-    minf = tuple.getDouble("min(a_f)");
-    maxi = tuple.getDouble("max(a_i)");
-    maxf = tuple.getDouble("max(a_f)");
-    avgi = tuple.getDouble("avg(a_i)");
-    avgf = tuple.getDouble("avg(a_f)");
-    count = tuple.getDouble("count(*)");
-
-    assertEquals("hello3", bucket);
-    assertEquals(38, sumi.doubleValue(), 0.01);
-    assertEquals(26, sumf.doubleValue(), 0.01);
-    assertEquals(3, mini.doubleValue(), 0.01);
-    assertEquals(3, minf.doubleValue(), 0.01);
-    assertEquals(13, maxi.doubleValue(), 0.01);
-    assertEquals(9, maxf.doubleValue(), 0.01);
-    assertEquals(9.5, avgi.doubleValue(), 0.01);
-    assertEquals(6.5, avgf.doubleValue(), 0.01);
-    assertEquals(4, count.doubleValue(), 0.01);
-
-    tuple = tuples.get(2);
-    bucket = tuple.getString("a_s");
-    sumi = tuple.getDouble("sum(a_i)");
-    sumf = tuple.getDouble("sum(a_f)");
-    mini = tuple.getDouble("min(a_i)");
-    minf = tuple.getDouble("min(a_f)");
-    maxi = tuple.getDouble("max(a_i)");
-    maxf = tuple.getDouble("max(a_f)");
-    avgi = tuple.getDouble("avg(a_i)");
-    avgf = tuple.getDouble("avg(a_f)");
-    count = tuple.getDouble("count(*)");
-
-    assertEquals("hello4", bucket);
-    assertEquals(15, sumi.longValue());
-    assertEquals(11.0, sumf.doubleValue(), 0.1);
-    assertEquals(4.0, mini.doubleValue(), 0.1);
-    assertEquals(4.0, minf.doubleValue(), 0.1);
-    assertEquals(11.0, maxi.doubleValue(), 0.1);
-    assertEquals(7.0, maxf.doubleValue(), 0.1);
-    assertEquals(7.5, avgi.doubleValue(), 0.1);
-    assertEquals(5.5, avgf.doubleValue(), 0.1);
-    assertEquals(2, count.doubleValue(), 0.1);
-
+    StreamContext streamContext = new StreamContext();
+    SolrClientCache solrClientCache = new SolrClientCache();
+    streamContext.setSolrClientCache(solrClientCache);
+
+    try {
+      SolrParams sParamsA = mapParams("q", "*:*", "fl", "a_s,a_i,a_f", "sort", "a_s asc");
+
+      Bucket[] buckets = {new Bucket("a_s")};
+
+      Metric[] metrics = {new SumMetric("a_i"),
+          new SumMetric("a_f"),
+          new MinMetric("a_i"),
+          new MinMetric("a_f"),
+          new MaxMetric("a_i"),
+          new MaxMetric("a_f"),
+          new MeanMetric("a_i"),
+          new MeanMetric("a_f"),
+          new CountMetric()};
+
+      FieldComparator[] sorts = {new FieldComparator("sum(a_i)",
+          ComparatorOrder.ASCENDING)};
+
+      FacetStream facetStream = new FacetStream(zkHost, COLLECTIONORALIAS, sParamsA, buckets, metrics, sorts, 100);
+
+      List<Tuple> tuples = getTuples(facetStream);
+
+      assert (tuples.size() == 3);
+
+      //Test Long and Double Sums
+
+      Tuple tuple = tuples.get(0);
+      String bucket = tuple.getString("a_s");
+      Double sumi = tuple.getDouble("sum(a_i)");
+      Double sumf = tuple.getDouble("sum(a_f)");
+      Double mini = tuple.getDouble("min(a_i)");
+      Double minf = tuple.getDouble("min(a_f)");
+      Double maxi = tuple.getDouble("max(a_i)");
+      Double maxf = tuple.getDouble("max(a_f)");
+      Double avgi = tuple.getDouble("avg(a_i)");
+      Double avgf = tuple.getDouble("avg(a_f)");
+      Double count = tuple.getDouble("count(*)");
+
+      assertEquals("hello4", bucket);
+      assertEquals(15, sumi.longValue());
+      assertEquals(11.0, sumf.doubleValue(), 0.01);
+      assertEquals(4.0, mini.doubleValue(), 0.01);
+      assertEquals(4.0, minf.doubleValue(), 0.01);
+      assertEquals(11.0, maxi.doubleValue(), 0.01);
+      assertEquals(7.0, maxf.doubleValue(), 0.01);
+      assertEquals(7.5, avgi.doubleValue(), 0.01);
+      assertEquals(5.5, avgf.doubleValue(), 0.01);
+      assertEquals(2, count.doubleValue(), 0.01);
+
+      tuple = tuples.get(1);
+      bucket = tuple.getString("a_s");
+      sumi = tuple.getDouble("sum(a_i)");
+      sumf = tuple.getDouble("sum(a_f)");
+      mini = tuple.getDouble("min(a_i)");
+      minf = tuple.getDouble("min(a_f)");
+      maxi = tuple.getDouble("max(a_i)");
+      maxf = tuple.getDouble("max(a_f)");
+      avgi = tuple.getDouble("avg(a_i)");
+      avgf = tuple.getDouble("avg(a_f)");
+      count = tuple.getDouble("count(*)");
+
+      assertEquals("hello0", bucket);
+      assertEquals(17, sumi.doubleValue(), .01);
+      assertEquals(18, sumf.doubleValue(), .01);
+      assertEquals(0.0, mini.doubleValue(), .01);
+      assertEquals(1.0, minf.doubleValue(), .01);
+      assertEquals(14.0, maxi.doubleValue(), .01);
+      assertEquals(10.0, maxf.doubleValue(), .01);
+      assertEquals(4.25, avgi.doubleValue(), .01);
+      assertEquals(4.5, avgf.doubleValue(), .01);
+      assertEquals(4, count.doubleValue(), .01);
+
+      tuple = tuples.get(2);
+      bucket = tuple.getString("a_s");
+      sumi = tuple.getDouble("sum(a_i)");
+      sumf = tuple.getDouble("sum(a_f)");
+      mini = tuple.getDouble("min(a_i)");
+      minf = tuple.getDouble("min(a_f)");
+      maxi = tuple.getDouble("max(a_i)");
+      maxf = tuple.getDouble("max(a_f)");
+      avgi = tuple.getDouble("avg(a_i)");
+      avgf = tuple.getDouble("avg(a_f)");
+      count = tuple.getDouble("count(*)");
+
+      assertEquals("hello3", bucket);
+      assertEquals(38.0, sumi.doubleValue(), 0.01);
+      assertEquals(26.0, sumf.doubleValue(), 0.01);
+      assertEquals(3.0, mini.doubleValue(), 0.01);
+      assertEquals(3.0, minf.doubleValue(), 0.01);
+      assertEquals(13.0, maxi.doubleValue(), 0.01);
+      assertEquals(9.0, maxf.doubleValue(), 0.01);
+      assertEquals(9.5, avgi.doubleValue(), 0.01);
+      assertEquals(6.5, avgf.doubleValue(), 0.01);
+      assertEquals(4, count.doubleValue(), 0.01);
+
+
+      //Reverse the Sort.
+
+      sorts[0] = new FieldComparator("sum(a_i)", ComparatorOrder.DESCENDING);
+
+      facetStream = new FacetStream(zkHost, COLLECTIONORALIAS, sParamsA, buckets, metrics, sorts, 100);
+
+      tuples = getTuples(facetStream);
+
+      assertEquals(3, tuples.size());
+
+      //Test Long and Double Sums
+
+      tuple = tuples.get(0);
+      bucket = tuple.getString("a_s");
+      sumi = tuple.getDouble("sum(a_i)");
+      sumf = tuple.getDouble("sum(a_f)");
+      mini = tuple.getDouble("min(a_i)");
+      minf = tuple.getDouble("min(a_f)");
+      maxi = tuple.getDouble("max(a_i)");
+      maxf = tuple.getDouble("max(a_f)");
+      avgi = tuple.getDouble("avg(a_i)");
+      avgf = tuple.getDouble("avg(a_f)");
+      count = tuple.getDouble("count(*)");
+
+      assertEquals("hello3", bucket);
+      assertEquals(38, sumi.doubleValue(), 0.1);
+      assertEquals(26, sumf.doubleValue(), 0.1);
+      assertEquals(3, mini.doubleValue(), 0.1);
+      assertEquals(3, minf.doubleValue(), 0.1);
+      assertEquals(13, maxi.doubleValue(), 0.1);
+      assertEquals(9, maxf.doubleValue(), 0.1);
+      assertEquals(9.5, avgi.doubleValue(), 0.1);
+      assertEquals(6.5, avgf.doubleValue(), 0.1);
+      assertEquals(4, count.doubleValue(), 0.1);
+
+      tuple = tuples.get(1);
+      bucket = tuple.getString("a_s");
+      sumi = tuple.getDouble("sum(a_i)");
+      sumf = tuple.getDouble("sum(a_f)");
+      mini = tuple.getDouble("min(a_i)");
+      minf = tuple.getDouble("min(a_f)");
+      maxi = tuple.getDouble("max(a_i)");
+      maxf = tuple.getDouble("max(a_f)");
+      avgi = tuple.getDouble("avg(a_i)");
+      avgf = tuple.getDouble("avg(a_f)");
+      count = tuple.getDouble("count(*)");
+
+      assertEquals("hello0", bucket);
+      assertEquals(17, sumi.doubleValue(), 0.01);
+      assertEquals(18, sumf.doubleValue(), 0.01);
+      assertEquals(0, mini.doubleValue(), 0.01);
+      assertEquals(1, minf.doubleValue(), 0.01);
+      assertEquals(14, maxi.doubleValue(), 0.01);
+      assertEquals(10, maxf.doubleValue(), 0.01);
+      assertEquals(4.25, avgi.doubleValue(), 0.01);
+      assertEquals(4.5, avgf.doubleValue(), 0.01);
+      assertEquals(4, count.doubleValue(), 0.01);
+
+      tuple = tuples.get(2);
+      bucket = tuple.getString("a_s");
+      sumi = tuple.getDouble("sum(a_i)");
+      sumf = tuple.getDouble("sum(a_f)");
+      mini = tuple.getDouble("min(a_i)");
+      minf = tuple.getDouble("min(a_f)");
+      maxi = tuple.getDouble("max(a_i)");
+      maxf = tuple.getDouble("max(a_f)");
+      avgi = tuple.getDouble("avg(a_i)");
+      avgf = tuple.getDouble("avg(a_f)");
+      count = tuple.getDouble("count(*)");
+
+      assertEquals("hello4", bucket);
+      assertEquals(15, sumi.longValue());
+      assertEquals(11, sumf.doubleValue(), 0.01);
+      assertEquals(4.0, mini.doubleValue(), 0.01);
+      assertEquals(4.0, minf.doubleValue(), 0.01);
+      assertEquals(11.0, maxi.doubleValue(), 0.01);
+      assertEquals(7.0, maxf.doubleValue(), 0.01);
+      assertEquals(7.5, avgi.doubleValue(), 0.01);
+      assertEquals(5.5, avgf.doubleValue(), 0.01);
+      assertEquals(2, count.doubleValue(), 0.01);
+
+
+      //Test index sort
+
+      sorts[0] = new FieldComparator("a_s", ComparatorOrder.DESCENDING);
+
+
+      facetStream = new FacetStream(zkHost, COLLECTIONORALIAS, sParamsA, buckets, metrics, sorts, 100);
+      facetStream.setStreamContext(streamContext);
+
+      tuples = getTuples(facetStream);
+
+      assertEquals(3, tuples.size());
+
+
+      tuple = tuples.get(0);
+      bucket = tuple.getString("a_s");
+      sumi = tuple.getDouble("sum(a_i)");
+      sumf = tuple.getDouble("sum(a_f)");
+      mini = tuple.getDouble("min(a_i)");
+      minf = tuple.getDouble("min(a_f)");
+      maxi = tuple.getDouble("max(a_i)");
+      maxf = tuple.getDouble("max(a_f)");
+      avgi = tuple.getDouble("avg(a_i)");
+      avgf = tuple.getDouble("avg(a_f)");
+      count = tuple.getDouble("count(*)");
+
+
+      assertEquals("hello4", bucket);
+      assertEquals(15, sumi.longValue());
+      assertEquals(11, sumf.doubleValue(), 0.01);
+      assertEquals(4, mini.doubleValue(), 0.01);
+      assertEquals(4, minf.doubleValue(), 0.01);
+      assertEquals(11, maxi.doubleValue(), 0.01);
+      assertEquals(7, maxf.doubleValue(), 0.01);
+      assertEquals(7.5, avgi.doubleValue(), 0.01);
+      assertEquals(5.5, avgf.doubleValue(), 0.01);
+      assertEquals(2, count.doubleValue(), 0.01);
+
+      tuple = tuples.get(1);
+      bucket = tuple.getString("a_s");
+      sumi = tuple.getDouble("sum(a_i)");
+      sumf = tuple.getDouble("sum(a_f)");
+      mini = tuple.getDouble("min(a_i)");
+      minf = tuple.getDouble("min(a_f)");
+      maxi = tuple.getDouble("max(a_i)");
+      maxf = tuple.getDouble("max(a_f)");
+      avgi = tuple.getDouble("avg(a_i)");
+      avgf = tuple.getDouble("avg(a_f)");
+      count = tuple.getDouble("count(*)");
+
+      assertTrue(bucket.equals("hello3"));
+      assertTrue(sumi.doubleValue() == 38.0D);
+      assertTrue(sumf.doubleValue() == 26.0D);
+      assertTrue(mini.doubleValue() == 3.0D);
+      assertTrue(minf.doubleValue() == 3.0D);
+      assertTrue(maxi.doubleValue() == 13.0D);
+      assertTrue(maxf.doubleValue() == 9.0D);
+      assertTrue(avgi.doubleValue() == 9.5D);
+      assertTrue(avgf.doubleValue() == 6.5D);
+      assertTrue(count.doubleValue() == 4);
+
+      tuple = tuples.get(2);
+      bucket = tuple.getString("a_s");
+      sumi = tuple.getDouble("sum(a_i)");
+      sumf = tuple.getDouble("sum(a_f)");
+      mini = tuple.getDouble("min(a_i)");
+      minf = tuple.getDouble("min(a_f)");
+      maxi = tuple.getDouble("max(a_i)");
+      maxf = tuple.getDouble("max(a_f)");
+      avgi = tuple.getDouble("avg(a_i)");
+      avgf = tuple.getDouble("avg(a_f)");
+      count = tuple.getDouble("count(*)");
+
+      assertEquals("hello0", bucket);
+      assertEquals(17, sumi.doubleValue(), 0.01);
+      assertEquals(18, sumf.doubleValue(), 0.01);
+      assertEquals(0, mini.doubleValue(), 0.01);
+      assertEquals(1, minf.doubleValue(), 0.01);
+      assertEquals(14, maxi.doubleValue(), 0.01);
+      assertEquals(10, maxf.doubleValue(), 0.01);
+      assertEquals(4.25, avgi.doubleValue(), 0.01);
+      assertEquals(4.5, avgf.doubleValue(), 0.01);
+      assertEquals(4, count.doubleValue(), 0.01);
+
+      //Test index sort
+
+      sorts[0] = new FieldComparator("a_s", ComparatorOrder.ASCENDING);
+
+      facetStream = new FacetStream(zkHost, COLLECTIONORALIAS, sParamsA, buckets, metrics, sorts, 100);
+      facetStream.setStreamContext(streamContext);
+      tuples = getTuples(facetStream);
+
+      assertEquals(3, tuples.size());
+
+      tuple = tuples.get(0);
+      bucket = tuple.getString("a_s");
+      sumi = tuple.getDouble("sum(a_i)");
+      sumf = tuple.getDouble("sum(a_f)");
+      mini = tuple.getDouble("min(a_i)");
+      minf = tuple.getDouble("min(a_f)");
+      maxi = tuple.getDouble("max(a_i)");
+      maxf = tuple.getDouble("max(a_f)");
+      avgi = tuple.getDouble("avg(a_i)");
+      avgf = tuple.getDouble("avg(a_f)");
+      count = tuple.getDouble("count(*)");
+
+      assertEquals("hello0", bucket);
+      assertEquals(17, sumi.doubleValue(), 0.01);
+      assertEquals(18, sumf.doubleValue(), 0.01);
+      assertEquals(0, mini.doubleValue(), 0.01);
+      assertEquals(1, minf.doubleValue(), 0.01);
+      assertEquals(14, maxi.doubleValue(), 0.01);
+      assertEquals(10, maxf.doubleValue(), 0.01);
+      assertEquals(4.25, avgi.doubleValue(), 0.0001);
+      assertEquals(4.5, avgf.doubleValue(), 0.001);
+      assertEquals(4, count.doubleValue(), 0.01);
+
+      tuple = tuples.get(1);
+      bucket = tuple.getString("a_s");
+      sumi = tuple.getDouble("sum(a_i)");
+      sumf = tuple.getDouble("sum(a_f)");
+      mini = tuple.getDouble("min(a_i)");
+      minf = tuple.getDouble("min(a_f)");
+      maxi = tuple.getDouble("max(a_i)");
+      maxf = tuple.getDouble("max(a_f)");
+      avgi = tuple.getDouble("avg(a_i)");
+      avgf = tuple.getDouble("avg(a_f)");
+      count = tuple.getDouble("count(*)");
+
+      assertEquals("hello3", bucket);
+      assertEquals(38, sumi.doubleValue(), 0.01);
+      assertEquals(26, sumf.doubleValue(), 0.01);
+      assertEquals(3, mini.doubleValue(), 0.01);
+      assertEquals(3, minf.doubleValue(), 0.01);
+      assertEquals(13, maxi.doubleValue(), 0.01);
+      assertEquals(9, maxf.doubleValue(), 0.01);
+      assertEquals(9.5, avgi.doubleValue(), 0.01);
+      assertEquals(6.5, avgf.doubleValue(), 0.01);
+      assertEquals(4, count.doubleValue(), 0.01);
+
+      tuple = tuples.get(2);
+      bucket = tuple.getString("a_s");
+      sumi = tuple.getDouble("sum(a_i)");
+      sumf = tuple.getDouble("sum(a_f)");
+      mini = tuple.getDouble("min(a_i)");
+      minf = tuple.getDouble("min(a_f)");
+      maxi = tuple.getDouble("max(a_i)");
+      maxf = tuple.getDouble("max(a_f)");
+      avgi = tuple.getDouble("avg(a_i)");
+      avgf = tuple.getDouble("avg(a_f)");
+      count = tuple.getDouble("count(*)");
+
+      assertEquals("hello4", bucket);
+      assertEquals(15, sumi.longValue());
+      assertEquals(11.0, sumf.doubleValue(), 0.1);
+      assertEquals(4.0, mini.doubleValue(), 0.1);
+      assertEquals(4.0, minf.doubleValue(), 0.1);
+      assertEquals(11.0, maxi.doubleValue(), 0.1);
+      assertEquals(7.0, maxf.doubleValue(), 0.1);
+      assertEquals(7.5, avgi.doubleValue(), 0.1);
+      assertEquals(5.5, avgf.doubleValue(), 0.1);
+      assertEquals(2, count.doubleValue(), 0.1);
+    } finally {
+      solrClientCache.close();
+    }
   }
 
 
@@ -1042,7 +1145,11 @@ public void testTrace() throws Exception {
     List<String> selectOrder = ("asc".equals(sortDir)) ? Arrays.asList(ascOrder) : Arrays.asList(descOrder);
     List<String> selectOrderBool = ("asc".equals(sortDir)) ? Arrays.asList(ascOrderBool) : Arrays.asList(descOrderBool);
     SolrParams exportParams = mapParams("q", "*:*", "qt", "/export", "fl", "id," + field, "sort", field + " " + sortDir + ",id asc");
+    StreamContext streamContext = new StreamContext();
+    SolrClientCache solrClientCache = new SolrClientCache();
+    streamContext.setSolrClientCache(solrClientCache);
     try (CloudSolrStream solrStream = new CloudSolrStream(zkHost, COLLECTIONORALIAS, exportParams)) {
+      solrStream.setStreamContext(streamContext);
       List<Tuple> tuples = getTuples(solrStream);
       assertEquals("There should be exactly 32 responses returned", 32, tuples.size());
       // Since the getTuples method doesn't return the EOF tuple, these two entries should be the same size.
@@ -1053,6 +1160,8 @@ public void testTrace() throws Exception {
                 "' RESTORE GETTING selectOrder from select statement after LUCENE-7548",
             tuples.get(idx).getString("id"), (field.startsWith("b_") ? selectOrderBool.get(idx) : selectOrder.get(idx)));
       }
+    } finally {
+      solrClientCache.close();
     }
   }
 
@@ -1081,7 +1190,12 @@ public void testTrace() throws Exception {
     }
     SolrParams sParams = mapParams("q", "*:*", "qt", "/export", "fl", fl.toString(), "sort", "id asc");
 
+    StreamContext streamContext = new StreamContext();
+    SolrClientCache solrClientCache = new SolrClientCache();
+    streamContext.setSolrClientCache(solrClientCache);
+
     try (CloudSolrStream solrStream = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParams)) {
+      solrStream.setStreamContext(streamContext);
       List<Tuple> tuples = getTuples(solrStream);
       assertEquals("There should be exactly 32 responses returned", 32, tuples.size());
 
@@ -1097,6 +1211,8 @@ public void testTrace() throws Exception {
           }
         }
       }
+    } finally {
+      solrClientCache.close();
     }
   }
 
@@ -1229,173 +1345,181 @@ public void testTrace() throws Exception {
         .add(id, "9", "level1_s", "hello0", "level2_s", "b", "a_i", "14", "a_f", "10")
         .commit(cluster.getSolrClient(), COLLECTIONORALIAS);
 
-    SolrParams sParamsA = mapParams("q", "*:*", "fl", "a_i,a_f");
-
-    Bucket[] buckets =  {new Bucket("level1_s"), new Bucket("level2_s")};
-
-    Metric[] metrics = {new SumMetric("a_i"),
-                        new CountMetric()};
-
-    FieldComparator[] sorts = {new FieldComparator("sum(a_i)", ComparatorOrder.DESCENDING), new FieldComparator("sum(a_i)", ComparatorOrder.DESCENDING)};
-
-    FacetStream facetStream = new FacetStream(
-        zkHost,
-        COLLECTIONORALIAS,
-        sParamsA,
-        buckets,
-        metrics,
-        sorts,
-        100);
-
-    List<Tuple> tuples = getTuples(facetStream);
-    assertEquals(6, tuples.size());
-
-    Tuple tuple = tuples.get(0);
-    String bucket1 = tuple.getString("level1_s");
-    String bucket2 = tuple.getString("level2_s");
-    Double sumi = tuple.getDouble("sum(a_i)");
-    Double count = tuple.getDouble("count(*)");
-
-    assertEquals("hello3", bucket1);
-    assertEquals("b", bucket2);
-    assertEquals(35, sumi.longValue());
-    assertEquals(3, count, 0.1);
-
-    tuple = tuples.get(1);
-    bucket1 = tuple.getString("level1_s");
-    bucket2 = tuple.getString("level2_s");
-    sumi = tuple.getDouble("sum(a_i)");
-    count = tuple.getDouble("count(*)");
-
-    assertEquals("hello0", bucket1);
-    assertEquals("b", bucket2);
-    assertEquals(15, sumi.longValue());
-    assertEquals(2, count, 0.1);
-
-    tuple = tuples.get(2);
-    bucket1 = tuple.getString("level1_s");
-    bucket2 = tuple.getString("level2_s");
-    sumi = tuple.getDouble("sum(a_i)");
-    count = tuple.getDouble("count(*)");
-
-    assertEquals("hello4", bucket1);
-    assertEquals("b", bucket2);
-    assertEquals(11, sumi.longValue());
-    assertEquals(1, count.doubleValue(), 0.1);
-
-    tuple = tuples.get(3);
-    bucket1 = tuple.getString("level1_s");
-    bucket2 = tuple.getString("level2_s");
-    sumi = tuple.getDouble("sum(a_i)");
-    count = tuple.getDouble("count(*)");
-
-    assertEquals("hello4", bucket1);
-    assertEquals("a", bucket2);
-    assertEquals(4, sumi.longValue());
-    assertEquals(1, count.doubleValue(), 0.1);
-
-    tuple = tuples.get(4);
-    bucket1 = tuple.getString("level1_s");
-    bucket2 = tuple.getString("level2_s");
-    sumi = tuple.getDouble("sum(a_i)");
-    count = tuple.getDouble("count(*)");
-
-    assertEquals("hello3", bucket1);
-    assertEquals("a", bucket2);
-    assertEquals(3, sumi.longValue());
-    assertEquals(1, count.doubleValue(), 0.1);
-
-    tuple = tuples.get(5);
-    bucket1 = tuple.getString("level1_s");
-    bucket2 = tuple.getString("level2_s");
-    sumi = tuple.getDouble("sum(a_i)");
-    count = tuple.getDouble("count(*)");
-
-    assertEquals("hello0", bucket1);
-    assertEquals("a", bucket2);
-    assertEquals(2, sumi.longValue());
-    assertEquals(2, count.doubleValue(), 0.1);
-
-    sorts[0] =  new FieldComparator("level1_s", ComparatorOrder.DESCENDING );
-    sorts[1] =  new FieldComparator("level2_s", ComparatorOrder.DESCENDING );
-    facetStream = new FacetStream(
-        zkHost,
-        COLLECTIONORALIAS,
-        sParamsA,
-        buckets,
-        metrics,
-        sorts,
-        100);
-
-    tuples = getTuples(facetStream);
-    assertEquals(6, tuples.size());
-
-    tuple = tuples.get(0);
-    bucket1 = tuple.getString("level1_s");
-    bucket2 = tuple.getString("level2_s");
-    sumi = tuple.getDouble("sum(a_i)");
-    count = tuple.getDouble("count(*)");
-
-    assertEquals("hello4", bucket1);
-    assertEquals("b", bucket2);
-    assertEquals(11, sumi.longValue());
-    assertEquals(1, count, 0.1);
-
-    tuple = tuples.get(1);
-    bucket1 = tuple.getString("level1_s");
-    bucket2 = tuple.getString("level2_s");
-    sumi = tuple.getDouble("sum(a_i)");
-    count = tuple.getDouble("count(*)");
-
-    assertEquals("hello4", bucket1);
-    assertEquals("a", bucket2);
-    assertEquals(4, sumi.longValue());
-    assertEquals(1, count.doubleValue(), 0.1);
-
-    tuple = tuples.get(2);
-    bucket1 = tuple.getString("level1_s");
-    bucket2 = tuple.getString("level2_s");
-    sumi = tuple.getDouble("sum(a_i)");
-    count = tuple.getDouble("count(*)");
-
-    assertEquals("hello3", bucket1);
-    assertEquals("b", bucket2);
-    assertEquals(35, sumi.longValue());
-    assertEquals(3, count.doubleValue(), 0.1);
-
-    tuple = tuples.get(3);
-    bucket1 = tuple.getString("level1_s");
-    bucket2 = tuple.getString("level2_s");
-    sumi = tuple.getDouble("sum(a_i)");
-    count = tuple.getDouble("count(*)");
-
-    assertEquals("hello3", bucket1);
-    assertEquals("a", bucket2);
-    assertEquals(3, sumi.longValue());
-    assertEquals(1, count.doubleValue(), 0.1);
-
-    tuple = tuples.get(4);
-    bucket1 = tuple.getString("level1_s");
-    bucket2 = tuple.getString("level2_s");
-    sumi = tuple.getDouble("sum(a_i)");
-    count = tuple.getDouble("count(*)");
-
-    assertEquals("hello0", bucket1);
-    assertEquals("b", bucket2);
-    assertEquals(15, sumi.longValue());
-    assertEquals(2, count.doubleValue(), 0.1);
-
-    tuple = tuples.get(5);
-    bucket1 = tuple.getString("level1_s");
-    bucket2 = tuple.getString("level2_s");
-    sumi = tuple.getDouble("sum(a_i)");
-    count = tuple.getDouble("count(*)");
-
-    assertEquals("hello0", bucket1);
-    assertEquals("a", bucket2);
-    assertEquals(2, sumi.longValue());
-    assertEquals(2, count.doubleValue(), 0.1);
-
+    StreamContext streamContext = new StreamContext();
+    SolrClientCache solrClientCache = new SolrClientCache();
+    streamContext.setSolrClientCache(solrClientCache);
+
+    try {
+
+      SolrParams sParamsA = mapParams("q", "*:*", "fl", "a_i,a_f");
+
+      Bucket[] buckets = {new Bucket("level1_s"), new Bucket("level2_s")};
+
+      Metric[] metrics = {new SumMetric("a_i"),
+          new CountMetric()};
+
+      FieldComparator[] sorts = {new FieldComparator("sum(a_i)", ComparatorOrder.DESCENDING), new FieldComparator("sum(a_i)", ComparatorOrder.DESCENDING)};
+
+      FacetStream facetStream = new FacetStream(
+          zkHost,
+          COLLECTIONORALIAS,
+          sParamsA,
+          buckets,
+          metrics,
+          sorts,
+          100);
+      facetStream.setStreamContext(streamContext);
+      List<Tuple> tuples = getTuples(facetStream);
+      assertEquals(6, tuples.size());
+
+      Tuple tuple = tuples.get(0);
+      String bucket1 = tuple.getString("level1_s");
+      String bucket2 = tuple.getString("level2_s");
+      Double sumi = tuple.getDouble("sum(a_i)");
+      Double count = tuple.getDouble("count(*)");
+
+      assertEquals("hello3", bucket1);
+      assertEquals("b", bucket2);
+      assertEquals(35, sumi.longValue());
+      assertEquals(3, count, 0.1);
+
+      tuple = tuples.get(1);
+      bucket1 = tuple.getString("level1_s");
+      bucket2 = tuple.getString("level2_s");
+      sumi = tuple.getDouble("sum(a_i)");
+      count = tuple.getDouble("count(*)");
+
+      assertEquals("hello0", bucket1);
+      assertEquals("b", bucket2);
+      assertEquals(15, sumi.longValue());
+      assertEquals(2, count, 0.1);
+
+      tuple = tuples.get(2);
+      bucket1 = tuple.getString("level1_s");
+      bucket2 = tuple.getString("level2_s");
+      sumi = tuple.getDouble("sum(a_i)");
+      count = tuple.getDouble("count(*)");
+
+      assertEquals("hello4", bucket1);
+      assertEquals("b", bucket2);
+      assertEquals(11, sumi.longValue());
+      assertEquals(1, count.doubleValue(), 0.1);
+
+      tuple = tuples.get(3);
+      bucket1 = tuple.getString("level1_s");
+      bucket2 = tuple.getString("level2_s");
+      sumi = tuple.getDouble("sum(a_i)");
+      count = tuple.getDouble("count(*)");
+
+      assertEquals("hello4", bucket1);
+      assertEquals("a", bucket2);
+      assertEquals(4, sumi.longValue());
+      assertEquals(1, count.doubleValue(), 0.1);
+
+      tuple = tuples.get(4);
+      bucket1 = tuple.getString("level1_s");
+      bucket2 = tuple.getString("level2_s");
+      sumi = tuple.getDouble("sum(a_i)");
+      count = tuple.getDouble("count(*)");
+
+      assertEquals("hello3", bucket1);
+      assertEquals("a", bucket2);
+      assertEquals(3, sumi.longValue());
+      assertEquals(1, count.doubleValue(), 0.1);
+
+      tuple = tuples.get(5);
+      bucket1 = tuple.getString("level1_s");
+      bucket2 = tuple.getString("level2_s");
+      sumi = tuple.getDouble("sum(a_i)");
+      count = tuple.getDouble("count(*)");
+
+      assertEquals("hello0", bucket1);
+      assertEquals("a", bucket2);
+      assertEquals(2, sumi.longValue());
+      assertEquals(2, count.doubleValue(), 0.1);
+
+      sorts[0] = new FieldComparator("level1_s", ComparatorOrder.DESCENDING);
+      sorts[1] = new FieldComparator("level2_s", ComparatorOrder.DESCENDING);
+      facetStream = new FacetStream(
+          zkHost,
+          COLLECTIONORALIAS,
+          sParamsA,
+          buckets,
+          metrics,
+          sorts,
+          100);
+      facetStream.setStreamContext(streamContext);
+      tuples = getTuples(facetStream);
+      assertEquals(6, tuples.size());
+
+      tuple = tuples.get(0);
+      bucket1 = tuple.getString("level1_s");
+      bucket2 = tuple.getString("level2_s");
+      sumi = tuple.getDouble("sum(a_i)");
+      count = tuple.getDouble("count(*)");
+
+      assertEquals("hello4", bucket1);
+      assertEquals("b", bucket2);
+      assertEquals(11, sumi.longValue());
+      assertEquals(1, count, 0.1);
+
+      tuple = tuples.get(1);
+      bucket1 = tuple.getString("level1_s");
+      bucket2 = tuple.getString("level2_s");
+      sumi = tuple.getDouble("sum(a_i)");
+      count = tuple.getDouble("count(*)");
+
+      assertEquals("hello4", bucket1);
+      assertEquals("a", bucket2);
+      assertEquals(4, sumi.longValue());
+      assertEquals(1, count.doubleValue(), 0.1);
+
+      tuple = tuples.get(2);
+      bucket1 = tuple.getString("level1_s");
+      bucket2 = tuple.getString("level2_s");
+      sumi = tuple.getDouble("sum(a_i)");
+      count = tuple.getDouble("count(*)");
+
+      assertEquals("hello3", bucket1);
+      assertEquals("b", bucket2);
+      assertEquals(35, sumi.longValue());
+      assertEquals(3, count.doubleValue(), 0.1);
+
+      tuple = tuples.get(3);
+      bucket1 = tuple.getString("level1_s");
+      bucket2 = tuple.getString("level2_s");
+      sumi = tuple.getDouble("sum(a_i)");
+      count = tuple.getDouble("count(*)");
+
+      assertEquals("hello3", bucket1);
+      assertEquals("a", bucket2);
+      assertEquals(3, sumi.longValue());
+      assertEquals(1, count.doubleValue(), 0.1);
+
+      tuple = tuples.get(4);
+      bucket1 = tuple.getString("level1_s");
+      bucket2 = tuple.getString("level2_s");
+      sumi = tuple.getDouble("sum(a_i)");
+      count = tuple.getDouble("count(*)");
+
+      assertEquals("hello0", bucket1);
+      assertEquals("b", bucket2);
+      assertEquals(15, sumi.longValue());
+      assertEquals(2, count.doubleValue(), 0.1);
+
+      tuple = tuples.get(5);
+      bucket1 = tuple.getString("level1_s");
+      bucket2 = tuple.getString("level2_s");
+      sumi = tuple.getDouble("sum(a_i)");
+      count = tuple.getDouble("count(*)");
+
+      assertEquals("hello0", bucket1);
+      assertEquals("a", bucket2);
+      assertEquals(2, sumi.longValue());
+      assertEquals(2, count.doubleValue(), 0.1);
+    } finally {
+      solrClientCache.close();
+    }
   }
 
   @Test
@@ -1413,166 +1537,174 @@ public void testTrace() throws Exception {
         .add(id, "8", "a_s", "hello3", "a_i", "13", "a_f", "9")
         .add(id, "9", "a_s", "hello0", "a_i", "14", "a_f", "10")
         .commit(cluster.getSolrClient(), COLLECTIONORALIAS);
-
-    SolrParams sParamsA = mapParams("q", "*:*", "fl", "a_s,a_i,a_f", "sort", "a_s asc");
-    CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParamsA);
-
-    Bucket[] buckets =  {new Bucket("a_s")};
-
-    Metric[] metrics = {new SumMetric("a_i"),
-                        new SumMetric("a_f"),
-                        new MinMetric("a_i"),
-                        new MinMetric("a_f"),
-                        new MaxMetric("a_i"),
-                        new MaxMetric("a_f"),
-                        new MeanMetric("a_i"),
-                        new MeanMetric("a_f"),
-                        new CountMetric()};
-
-    RollupStream rollupStream = new RollupStream(stream, buckets, metrics);
-    List<Tuple> tuples = getTuples(rollupStream);
-
-    assert(tuples.size() == 3);
-
-    //Test Long and Double Sums
-
-    Tuple tuple = tuples.get(0);
-    String bucket = tuple.getString("a_s");
-    Double sumi = tuple.getDouble("sum(a_i)");
-    Double sumf = tuple.getDouble("sum(a_f)");
-    Double mini = tuple.getDouble("min(a_i)");
-    Double minf = tuple.getDouble("min(a_f)");
-    Double maxi = tuple.getDouble("max(a_i)");
-    Double maxf = tuple.getDouble("max(a_f)");
-    Double avgi = tuple.getDouble("avg(a_i)");
-    Double avgf = tuple.getDouble("avg(a_f)");
-    Double count = tuple.getDouble("count(*)");
-
-
-    assertEquals("hello0", bucket);
-    assertEquals(17, sumi.doubleValue(), 0.001);
-    assertEquals(18, sumf.doubleValue(), 0.001);
-    assertEquals(0, mini.doubleValue(), 0.001);
-    assertEquals(1, minf.doubleValue(), 0.001);
-    assertEquals(14, maxi.doubleValue(), 0.001);
-    assertEquals(10, maxf.doubleValue(), 0.001);
-    assertEquals(4.25, avgi.doubleValue(), 0.001);
-    assertEquals(4.5, avgf.doubleValue(), 0.001);
-    assertEquals(4, count.doubleValue(), 0.001);
-
-
-    tuple = tuples.get(1);
-    bucket = tuple.getString("a_s");
-    sumi = tuple.getDouble("sum(a_i)");
-    sumf = tuple.getDouble("sum(a_f)");
-    mini = tuple.getDouble("min(a_i)");
-    minf = tuple.getDouble("min(a_f)");
-    maxi = tuple.getDouble("max(a_i)");
-    maxf = tuple.getDouble("max(a_f)");
-    avgi = tuple.getDouble("avg(a_i)");
-    avgf = tuple.getDouble("avg(a_f)");
-    count = tuple.getDouble("count(*)");
-
-    assertEquals("hello3", bucket);
-    assertEquals(38, sumi.doubleValue(), 0.001);
-    assertEquals(26, sumf.doubleValue(), 0.001);
-    assertEquals(3, mini.doubleValue(), 0.001);
-    assertEquals(3, minf.doubleValue(), 0.001);
-    assertEquals(13, maxi.doubleValue(), 0.001);
-    assertEquals(9, maxf.doubleValue(), 0.001);
-    assertEquals(9.5, avgi.doubleValue(), 0.001);
-    assertEquals(6.5, avgf.doubleValue(), 0.001);
-    assertEquals(4, count.doubleValue(), 0.001);
-
-
-    tuple = tuples.get(2);
-    bucket = tuple.getString("a_s");
-    sumi = tuple.getDouble("sum(a_i)");
-    sumf = tuple.getDouble("sum(a_f)");
-    mini = tuple.getDouble("min(a_i)");
-    minf = tuple.getDouble("min(a_f)");
-    maxi = tuple.getDouble("max(a_i)");
-    maxf = tuple.getDouble("max(a_f)");
-    avgi = tuple.getDouble("avg(a_i)");
-    avgf = tuple.getDouble("avg(a_f)");
-    count = tuple.getDouble("count(*)");
-
-    assertEquals("hello4", bucket);
-    assertEquals(15, sumi.longValue());
-    assertEquals(11, sumf.doubleValue(), 0.01);
-    assertEquals(4, mini.doubleValue(), 0.01);
-    assertEquals(4, minf.doubleValue(), 0.01);
-    assertEquals(11, maxi.doubleValue(), 0.01);
-    assertEquals(7, maxf.doubleValue(), 0.01);
-    assertEquals(7.5, avgi.doubleValue(), 0.01);
-    assertEquals(5.5, avgf.doubleValue(), 0.01);
-    assertEquals(2, count.doubleValue(), 0.01);
-
-    // Test will null metrics
-    rollupStream = new RollupStream(stream, buckets, metrics);
-    tuples = getTuples(rollupStream);
-
-    assert(tuples.size() == 3);
-    tuple = tuples.get(0);
-    bucket = tuple.getString("a_s");
-    assertTrue(bucket.equals("hello0"));
-
-    tuple = tuples.get(1);
-    bucket = tuple.getString("a_s");
-    assertTrue(bucket.equals("hello3"));
-
-    tuple = tuples.get(2);
-    bucket = tuple.getString("a_s");
-    assertTrue(bucket.equals("hello4"));
-
-
-    //Test will null value in the grouping field
-    new UpdateRequest()
-        .add(id, "12", "a_s", null, "a_i", "14", "a_f", "10")
-        .commit(cluster.getSolrClient(), COLLECTIONORALIAS);
-
-    sParamsA = mapParams("q", "*:*", "fl", "a_s,a_i,a_f", "sort", "a_s asc", "qt", "/export");
-    stream = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParamsA);
-
-    Bucket[] buckets1 =  {new Bucket("a_s")};
-
-    Metric[] metrics1 = {new SumMetric("a_i"),
-        new SumMetric("a_f"),
-        new MinMetric("a_i"),
-        new MinMetric("a_f"),
-        new MaxMetric("a_i"),
-        new MaxMetric("a_f"),
-        new MeanMetric("a_i"),
-        new MeanMetric("a_f"),
-        new CountMetric()};
-
-    rollupStream = new RollupStream(stream, buckets1, metrics1);
-    tuples = getTuples(rollupStream);
-    //Check that we've got the extra NULL bucket
-    assertEquals(4, tuples.size());
-    tuple = tuples.get(0);
-    assertEquals("NULL", tuple.getString("a_s"));
-
-    sumi = tuple.getDouble("sum(a_i)");
-    sumf = tuple.getDouble("sum(a_f)");
-    mini = tuple.getDouble("min(a_i)");
-    minf = tuple.getDouble("min(a_f)");
-    maxi = tuple.getDouble("max(a_i)");
-    maxf = tuple.getDouble("max(a_f)");
-    avgi = tuple.getDouble("avg(a_i)");
-    avgf = tuple.getDouble("avg(a_f)");
-    count = tuple.getDouble("count(*)");
-
-    assertEquals(14, sumi.doubleValue(), 0.01);
-    assertEquals(10, sumf.doubleValue(), 0.01);
-    assertEquals(14, mini.doubleValue(), 0.01);
-    assertEquals(10, minf.doubleValue(), 0.01);
-    assertEquals(14, maxi.doubleValue(), 0.01);
-    assertEquals(10, maxf.doubleValue(), 0.01);
-    assertEquals(14, avgi.doubleValue(), 0.01);
-    assertEquals(10, avgf.doubleValue(), 0.01);
-    assertEquals(1, count.doubleValue(), 0.01);
-
+    StreamContext streamContext = new StreamContext();
+    SolrClientCache solrClientCache = new SolrClientCache();
+    streamContext.setSolrClientCache(solrClientCache);
+
+    try {
+      SolrParams sParamsA = mapParams("q", "*:*", "fl", "a_s,a_i,a_f", "sort", "a_s asc");
+      CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParamsA);
+
+      Bucket[] buckets = {new Bucket("a_s")};
+
+      Metric[] metrics = {new SumMetric("a_i"),
+          new SumMetric("a_f"),
+          new MinMetric("a_i"),
+          new MinMetric("a_f"),
+          new MaxMetric("a_i"),
+          new MaxMetric("a_f"),
+          new MeanMetric("a_i"),
+          new MeanMetric("a_f"),
+          new CountMetric()};
+
+      RollupStream rollupStream = new RollupStream(stream, buckets, metrics);
+      rollupStream.setStreamContext(streamContext);
+      List<Tuple> tuples = getTuples(rollupStream);
+
+      assert (tuples.size() == 3);
+
+      //Test Long and Double Sums
+
+      Tuple tuple = tuples.get(0);
+      String bucket = tuple.getString("a_s");
+      Double sumi = tuple.getDouble("sum(a_i)");
+      Double sumf = tuple.getDouble("sum(a_f)");
+      Double mini = tuple.getDouble("min(a_i)");
+      Double minf = tuple.getDouble("min(a_f)");
+      Double maxi = tuple.getDouble("max(a_i)");
+      Double maxf = tuple.getDouble("max(a_f)");
+      Double avgi = tuple.getDouble("avg(a_i)");
+      Double avgf = tuple.getDouble("avg(a_f)");
+      Double count = tuple.getDouble("count(*)");
+
+
+      assertEquals("hello0", bucket);
+      assertEquals(17, sumi.doubleValue(), 0.001);
+      assertEquals(18, sumf.doubleValue(), 0.001);
+      assertEquals(0, mini.doubleValue(), 0.001);
+      assertEquals(1, minf.doubleValue(), 0.001);
+      assertEquals(14, maxi.doubleValue(), 0.001);
+      assertEquals(10, maxf.doubleValue(), 0.001);
+      assertEquals(4.25, avgi.doubleValue(), 0.001);
+      assertEquals(4.5, avgf.doubleValue(), 0.001);
+      assertEquals(4, count.doubleValue(), 0.001);
+
+
+      tuple = tuples.get(1);
+      bucket = tuple.getString("a_s");
+      sumi = tuple.getDouble("sum(a_i)");
+      sumf = tuple.getDouble("sum(a_f)");
+      mini = tuple.getDouble("min(a_i)");
+      minf = tuple.getDouble("min(a_f)");
+      maxi = tuple.getDouble("max(a_i)");
+      maxf = tuple.getDouble("max(a_f)");
+      avgi = tuple.getDouble("avg(a_i)");
+      avgf = tuple.getDouble("avg(a_f)");
+      count = tuple.getDouble("count(*)");
+
+      assertEquals("hello3", bucket);
+      assertEquals(38, sumi.doubleValue(), 0.001);
+      assertEquals(26, sumf.doubleValue(), 0.001);
+      assertEquals(3, mini.doubleValue(), 0.001);
+      assertEquals(3, minf.doubleValue(), 0.001);
+      assertEquals(13, maxi.doubleValue(), 0.001);
+      assertEquals(9, maxf.doubleValue(), 0.001);
+      assertEquals(9.5, avgi.doubleValue(), 0.001);
+      assertEquals(6.5, avgf.doubleValue(), 0.001);
+      assertEquals(4, count.doubleValue(), 0.001);
+
+
+      tuple = tuples.get(2);
+      bucket = tuple.getString("a_s");
+      sumi = tuple.getDouble("sum(a_i)");
+      sumf = tuple.getDouble("sum(a_f)");
+      mini = tuple.getDouble("min(a_i)");
+      minf = tuple.getDouble("min(a_f)");
+      maxi = tuple.getDouble("max(a_i)");
+      maxf = tuple.getDouble("max(a_f)");
+      avgi = tuple.getDouble("avg(a_i)");
+      avgf = tuple.getDouble("avg(a_f)");
+      count = tuple.getDouble("count(*)");
+
+      assertEquals("hello4", bucket);
+      assertEquals(15, sumi.longValue());
+      assertEquals(11, sumf.doubleValue(), 0.01);
+      assertEquals(4, mini.doubleValue(), 0.01);
+      assertEquals(4, minf.doubleValue(), 0.01);
+      assertEquals(11, maxi.doubleValue(), 0.01);
+      assertEquals(7, maxf.doubleValue(), 0.01);
+      assertEquals(7.5, avgi.doubleValue(), 0.01);
+      assertEquals(5.5, avgf.doubleValue(), 0.01);
+      assertEquals(2, count.doubleValue(), 0.01);
+
+      // Test will null metrics
+      rollupStream = new RollupStream(stream, buckets, metrics);
+      rollupStream.setStreamContext(streamContext);
+      tuples = getTuples(rollupStream);
+
+      assert (tuples.size() == 3);
+      tuple = tuples.get(0);
+      bucket = tuple.getString("a_s");
+      assertTrue(bucket.equals("hello0"));
+
+      tuple = tuples.get(1);
+      bucket = tuple.getString("a_s");
+      assertTrue(bucket.equals("hello3"));
+
+      tuple = tuples.get(2);
+      bucket = tuple.getString("a_s");
+      assertTrue(bucket.equals("hello4"));
+
+
+      //Test will null value in the grouping field
+      new UpdateRequest()
+          .add(id, "12", "a_s", null, "a_i", "14", "a_f", "10")
+          .commit(cluster.getSolrClient(), COLLECTIONORALIAS);
+
+      sParamsA = mapParams("q", "*:*", "fl", "a_s,a_i,a_f", "sort", "a_s asc", "qt", "/export");
+      stream = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParamsA);
+      Bucket[] buckets1 = {new Bucket("a_s")};
+
+      Metric[] metrics1 = {new SumMetric("a_i"),
+          new SumMetric("a_f"),
+          new MinMetric("a_i"),
+          new MinMetric("a_f"),
+          new MaxMetric("a_i"),
+          new MaxMetric("a_f"),
+          new MeanMetric("a_i"),
+          new MeanMetric("a_f"),
+          new CountMetric()};
+
+      rollupStream = new RollupStream(stream, buckets1, metrics1);
+      rollupStream.setStreamContext(streamContext);
+      tuples = getTuples(rollupStream);
+      //Check that we've got the extra NULL bucket
+      assertEquals(4, tuples.size());
+      tuple = tuples.get(0);
+      assertEquals("NULL", tuple.getString("a_s"));
+
+      sumi = tuple.getDouble("sum(a_i)");
+      sumf = tuple.getDouble("sum(a_f)");
+      mini = tuple.getDouble("min(a_i)");
+      minf = tuple.getDouble("min(a_f)");
+      maxi = tuple.getDouble("max(a_i)");
+      maxf = tuple.getDouble("max(a_f)");
+      avgi = tuple.getDouble("avg(a_i)");
+      avgf = tuple.getDouble("avg(a_f)");
+      count = tuple.getDouble("count(*)");
+
+      assertEquals(14, sumi.doubleValue(), 0.01);
+      assertEquals(10, sumf.doubleValue(), 0.01);
+      assertEquals(14, mini.doubleValue(), 0.01);
+      assertEquals(10, minf.doubleValue(), 0.01);
+      assertEquals(14, maxi.doubleValue(), 0.01);
+      assertEquals(10, maxf.doubleValue(), 0.01);
+      assertEquals(14, avgi.doubleValue(), 0.01);
+      assertEquals(10, avgf.doubleValue(), 0.01);
+      assertEquals(1, count.doubleValue(), 0.01);
+    } finally {
+      solrClientCache.close();
+    }
   }
 
   @Test
@@ -1583,66 +1715,71 @@ public void testTrace() throws Exception {
     SolrClientCache cache = new SolrClientCache();
     context.setSolrClientCache(cache);
 
-    SolrParams sParams = mapParams("q", "a_s:hello0", "rows", "500", "fl", "id");
+    try {
+      SolrParams sParams = mapParams("q", "a_s:hello0", "rows", "500", "fl", "id");
 
-    TopicStream topicStream = new TopicStream(zkHost,
-        COLLECTIONORALIAS,
-        COLLECTIONORALIAS,
-                                              "50000000",
-                                              -1,
-                                              1000000, sParams);
+      TopicStream topicStream = new TopicStream(zkHost,
+          COLLECTIONORALIAS,
+          COLLECTIONORALIAS,
+          "50000000",
+          -1,
+          1000000, sParams);
 
-    DaemonStream daemonStream = new DaemonStream(topicStream, "daemon1", 1000, 500);
-    daemonStream.setStreamContext(context);
+      DaemonStream daemonStream = new DaemonStream(topicStream, "daemon1", 1000, 500);
+      daemonStream.setStreamContext(context);
 
-    daemonStream.open();
+      daemonStream.open();
 
-    // Wait for the checkpoint
-    JettySolrRunner jetty = cluster.getJettySolrRunners().get(0);
+      // Wait for the checkpoint
+      JettySolrRunner jetty = cluster.getJettySolrRunners().get(0);
 
 
-    SolrParams sParams1 = mapParams("qt", "/get", "ids", "50000000", "fl", "id");
-    int count = 0;
-    while(count == 0) {
-      SolrStream solrStream = new SolrStream(jetty.getBaseUrl().toString() + "/" + COLLECTIONORALIAS, sParams1);
-      List<Tuple> tuples = getTuples(solrStream);
-      count = tuples.size();
-      if(count > 0) {
-        Tuple t = tuples.get(0);
-        assertTrue(t.getLong("id") == 50000000);
-      } else {
-        System.out.println("###### Waiting for checkpoint #######:" + count);
+      SolrParams sParams1 = mapParams("qt", "/get", "ids", "50000000", "fl", "id");
+      int count = 0;
+      while (count == 0) {
+        SolrStream solrStream = new SolrStream(jetty.getBaseUrl().toString() + "/" + COLLECTIONORALIAS, sParams1);
+        solrStream.setStreamContext(context);
+        List<Tuple> tuples = getTuples(solrStream);
+        count = tuples.size();
+        if (count > 0) {
+          Tuple t = tuples.get(0);
+          assertTrue(t.getLong("id") == 50000000);
+        } else {
+          System.out.println("###### Waiting for checkpoint #######:" + count);
+        }
       }
-    }
 
-    new UpdateRequest()
-        .add(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "1")
-        .add(id, "2", "a_s", "hello0", "a_i", "2", "a_f", "2")
-        .add(id, "3", "a_s", "hello0", "a_i", "3", "a_f", "3")
-        .add(id, "4", "a_s", "hello0", "a_i", "4", "a_f", "4")
-        .add(id, "1", "a_s", "hello0", "a_i", "1", "a_f", "5")
-        .commit(cluster.getSolrClient(), COLLECTIONORALIAS);
+      new UpdateRequest()
+          .add(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "1")
+          .add(id, "2", "a_s", "hello0", "a_i", "2", "a_f", "2")
+          .add(id, "3", "a_s", "hello0", "a_i", "3", "a_f", "3")
+          .add(id, "4", "a_s", "hello0", "a_i", "4", "a_f", "4")
+          .add(id, "1", "a_s", "hello0", "a_i", "1", "a_f", "5")
+          .commit(cluster.getSolrClient(), COLLECTIONORALIAS);
 
-    for(int i=0; i<5; i++) {
-      daemonStream.read();
-    }
+      for (int i = 0; i < 5; i++) {
+        daemonStream.read();
+      }
 
-    new UpdateRequest()
-        .add(id, "5", "a_s", "hello0", "a_i", "4", "a_f", "4")
-        .add(id, "6", "a_s", "hello0", "a_i", "4", "a_f", "4")
-        .commit(cluster.getSolrClient(), COLLECTIONORALIAS);
+      new UpdateRequest()
+          .add(id, "5", "a_s", "hello0", "a_i", "4", "a_f", "4")
+          .add(id, "6", "a_s", "hello0", "a_i", "4", "a_f", "4")
+          .commit(cluster.getSolrClient(), COLLECTIONORALIAS);
 
-    for(int i=0; i<2; i++) {
-      daemonStream.read();
-    }
+      for (int i = 0; i < 2; i++) {
+        daemonStream.read();
+      }
+
+      daemonStream.shutdown();
 
-    daemonStream.shutdown();
+      Tuple tuple = daemonStream.read();
 
-    Tuple tuple = daemonStream.read();
+      assertTrue(tuple.EOF);
+      daemonStream.close();
+    } finally {
+      cache.close();
+    }
 
-    assertTrue(tuple.EOF);
-    daemonStream.close();
-    cache.close();
 
   }
 
@@ -1662,99 +1799,107 @@ public void testTrace() throws Exception {
         .add(id, "9", "a_s", "hello0", "a_i", "14", "a_f", "10")
         .commit(cluster.getSolrClient(), COLLECTIONORALIAS);
 
-    SolrParams sParamsA = mapParams("q", "*:*", "fl", "a_s,a_i,a_f", "sort", "a_s asc", "partitionKeys", "a_s");
-    CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParamsA);
-
-    Bucket[] buckets =  {new Bucket("a_s")};
-
-    Metric[] metrics = {new SumMetric("a_i"),
-                        new SumMetric("a_f"),
-                        new MinMetric("a_i"),
-                        new MinMetric("a_f"),
-                        new MaxMetric("a_i"),
-                        new MaxMetric("a_f"),
-                        new MeanMetric("a_i"),
-                        new MeanMetric("a_f"),
-                        new CountMetric()};
-
-    RollupStream rollupStream = new RollupStream(stream, buckets, metrics);
-    ParallelStream parallelStream = parallelStream(rollupStream, new FieldComparator("a_s", ComparatorOrder.ASCENDING));
-    attachStreamFactory(parallelStream);
-    List<Tuple> tuples = getTuples(parallelStream);
-
-    assertEquals(3, tuples.size());
-
-    //Test Long and Double Sums
-
-    Tuple tuple = tuples.get(0);
-    String bucket = tuple.getString("a_s");
-    Double sumi = tuple.getDouble("sum(a_i)");
-    Double sumf = tuple.getDouble("sum(a_f)");
-    Double mini = tuple.getDouble("min(a_i)");
-    Double minf = tuple.getDouble("min(a_f)");
-    Double maxi = tuple.getDouble("max(a_i)");
-    Double maxf = tuple.getDouble("max(a_f)");
-    Double avgi = tuple.getDouble("avg(a_i)");
-    Double avgf = tuple.getDouble("avg(a_f)");
-    Double count = tuple.getDouble("count(*)");
-
-    assertEquals("hello0", bucket);
-    assertEquals(17, sumi.doubleValue(), 0.001);
-    assertEquals(18, sumf.doubleValue(), 0.001);
-    assertEquals(0, mini.doubleValue(), 0.001);
-    assertEquals(1, minf.doubleValue(), 0.001);
-    assertEquals(14, maxi.doubleValue(), 0.001);
-    assertEquals(10, maxf.doubleValue(), 0.001);
-    assertEquals(4.25, avgi.doubleValue(), 0.001);
-    assertEquals(4.5, avgf.doubleValue(), 0.001);
-    assertEquals(4, count.doubleValue(), 0.001);
-
-    tuple = tuples.get(1);
-    bucket = tuple.getString("a_s");
-    sumi = tuple.getDouble("sum(a_i)");
-    sumf = tuple.getDouble("sum(a_f)");
-    mini = tuple.getDouble("min(a_i)");
-    minf = tuple.getDouble("min(a_f)");
-    maxi = tuple.getDouble("max(a_i)");
-    maxf = tuple.getDouble("max(a_f)");
-    avgi = tuple.getDouble("avg(a_i)");
-    avgf = tuple.getDouble("avg(a_f)");
-    count = tuple.getDouble("count(*)");
-
-    assertEquals("hello3", bucket);
-    assertEquals(38, sumi.doubleValue(), 0.001);
-    assertEquals(26, sumf.doubleValue(), 0.001);
-    assertEquals(3, mini.doubleValue(), 0.001);
-    assertEquals(3, minf.doubleValue(), 0.001);
-    assertEquals(13, maxi.doubleValue(), 0.001);
-    assertEquals(9, maxf.doubleValue(), 0.001);
-    assertEquals(9.5, avgi.doubleValue(), 0.001);
-    assertEquals(6.5, avgf.doubleValue(), 0.001);
-    assertEquals(4, count.doubleValue(), 0.001);
-
-    tuple = tuples.get(2);
-    bucket = tuple.getString("a_s");
-    sumi = tuple.getDouble("sum(a_i)");
-    sumf = tuple.getDouble("sum(a_f)");
-    mini = tuple.getDouble("min(a_i)");
-    minf = tuple.getDouble("min(a_f)");
-    maxi = tuple.getDouble("max(a_i)");
-    maxf = tuple.getDouble("max(a_f)");
-    avgi = tuple.getDouble("avg(a_i)");
-    avgf = tuple.getDouble("avg(a_f)");
-    count = tuple.getDouble("count(*)");
-
-    assertEquals("hello4", bucket);
-    assertEquals(15, sumi.longValue());
-    assertEquals(11, sumf.doubleValue(), 0.001);
-    assertEq

<TRUNCATED>