You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by jb...@apache.org on 2017/04/12 02:47:23 UTC

[3/4] lucene-solr:branch_6x: SOLR-10274: Fix backport

SOLR-10274: Fix backport


Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/b03a7b1c
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/b03a7b1c
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/b03a7b1c

Branch: refs/heads/branch_6x
Commit: b03a7b1cfc9939206c8c802b3dca8ecbb6c2e94f
Parents: 740d967
Author: Joel Bernstein <jb...@apache.org>
Authored: Tue Apr 11 15:17:03 2017 -0400
Committer: Joel Bernstein <jb...@apache.org>
Committed: Tue Apr 11 22:41:48 2017 -0400

----------------------------------------------------------------------
 .../client/solrj/io/stream/CloudSolrStream.java |   41 +-
 .../client/solrj/io/stream/ParallelStream.java  |   27 +-
 .../client/solrj/io/stream/StreamContext.java   |    4 +
 .../client/solrj/io/stream/JDBCStreamTest.java  |  308 +-
 .../io/stream/SelectWithEvaluatorsTest.java     |   37 +-
 .../solrj/io/stream/StreamExpressionTest.java   | 3783 ++++++++++--------
 .../client/solrj/io/stream/StreamingTest.java   | 2505 ++++++------
 7 files changed, 3652 insertions(+), 3053 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/b03a7b1c/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/CloudSolrStream.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/CloudSolrStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/CloudSolrStream.java
index 7161dc4..6d1764a 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/CloudSolrStream.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/CloudSolrStream.java
@@ -26,8 +26,6 @@ import java.util.List;
 import java.util.Locale;
 import java.util.Map;
 import java.util.Map.Entry;
-import java.util.Random;
-import java.util.Set;
 import java.util.TreeSet;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
@@ -35,7 +33,6 @@ import java.util.concurrent.Future;
 import java.util.stream.Collectors;
 
 import org.apache.solr.client.solrj.impl.CloudSolrClient;
-import org.apache.solr.client.solrj.impl.CloudSolrClient.Builder;
 import org.apache.solr.client.solrj.io.Tuple;
 import org.apache.solr.client.solrj.io.comp.ComparatorOrder;
 import org.apache.solr.client.solrj.io.comp.FieldComparator;
@@ -52,9 +49,7 @@ import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
 import org.apache.solr.common.cloud.Aliases;
 import org.apache.solr.common.cloud.ClusterState;
 import org.apache.solr.common.cloud.DocCollection;
-import org.apache.solr.common.cloud.Replica;
 import org.apache.solr.common.cloud.Slice;
-import org.apache.solr.common.cloud.ZkCoreNodeProps;
 import org.apache.solr.common.cloud.ZkStateReader;
 import org.apache.solr.common.params.MapSolrParams;
 import org.apache.solr.common.params.ModifiableSolrParams;
@@ -178,9 +173,11 @@ public class CloudSolrStream extends TupleStream implements Expressible {
     else if(zkHostExpression.getParameter() instanceof StreamExpressionValue){
       zkHost = ((StreamExpressionValue)zkHostExpression.getParameter()).getValue();
     }
+    /*
     if(null == zkHost){
       throw new IOException(String.format(Locale.ROOT,"invalid expression %s - zkHost not found for collection '%s'",expression,collectionName));
     }
+    */
     
     // We've got all the required items
     init(collectionName, zkHost, mParams);
@@ -299,14 +296,6 @@ public class CloudSolrStream extends TupleStream implements Expressible {
     this.tuples = new TreeSet();
     this.solrStreams = new ArrayList();
     this.eofTuples = Collections.synchronizedMap(new HashMap());
-    if (this.streamContext != null && this.streamContext.getSolrClientCache() != null) {
-      this.cloudSolrClient = this.streamContext.getSolrClientCache().getCloudSolrClient(zkHost);
-    } else {
-      this.cloudSolrClient = new Builder()
-          .withZkHost(zkHost)
-          .build();
-      this.cloudSolrClient.connect();
-    }
     constructStreams();
     openStreams();
   }
@@ -400,29 +389,15 @@ public class CloudSolrStream extends TupleStream implements Expressible {
 
   protected void constructStreams() throws IOException {
     try {
-      ZkStateReader zkStateReader = cloudSolrClient.getZkStateReader();
-      ClusterState clusterState = zkStateReader.getClusterState();
 
-      Collection<Slice> slices = CloudSolrStream.getSlices(this.collection, zkStateReader, true);
+      List<String> shardUrls = getShards(this.zkHost, this.collection, this.streamContext);
 
       ModifiableSolrParams mParams = new ModifiableSolrParams(params);
       mParams = adjustParams(mParams);
       mParams.set(DISTRIB, "false"); // We are the aggregator.
 
-      Set<String> liveNodes = clusterState.getLiveNodes();
-      for(Slice slice : slices) {
-        Collection<Replica> replicas = slice.getReplicas();
-        List<Replica> shuffler = new ArrayList<>();
-        for(Replica replica : replicas) {
-          if(replica.getState() == Replica.State.ACTIVE && liveNodes.contains(replica.getNodeName()))
-          shuffler.add(replica);
-        }
-
-        Collections.shuffle(shuffler, new Random());
-        Replica rep = shuffler.get(0);
-        ZkCoreNodeProps zkProps = new ZkCoreNodeProps(rep);
-        String url = zkProps.getCoreUrl();
-        SolrStream solrStream = new SolrStream(url, mParams);
+      for(String shardUrl : shardUrls) {
+        SolrStream solrStream = new SolrStream(shardUrl, mParams);
         if(streamContext != null) {
           solrStream.setStreamContext(streamContext);
         }
@@ -468,12 +443,6 @@ public class CloudSolrStream extends TupleStream implements Expressible {
         solrStream.close();
       }
     }
-
-    if ((this.streamContext == null || this.streamContext.getSolrClientCache() == null) &&
-        cloudSolrClient != null) {
-
-      cloudSolrClient.close();
-    }
   }
   
   /** Return the stream sort - ie, the order in which records are returned */

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/b03a7b1c/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ParallelStream.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ParallelStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ParallelStream.java
index 87e1354..def4c03 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ParallelStream.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ParallelStream.java
@@ -263,27 +263,7 @@ public class ParallelStream extends CloudSolrStream implements Expressible {
     try {
       Object pushStream = ((Expressible) tupleStream).toExpression(streamFactory);
 
-      ZkStateReader zkStateReader = cloudSolrClient.getZkStateReader();
-
-      Collection<Slice> slices = CloudSolrStream.getSlices(this.collection, zkStateReader, true);
-
-      ClusterState clusterState = zkStateReader.getClusterState();
-      Set<String> liveNodes = clusterState.getLiveNodes();
-
-      List<Replica> shuffler = new ArrayList<>();
-      for(Slice slice : slices) {
-        Collection<Replica> replicas = slice.getReplicas();
-        for (Replica replica : replicas) {
-          if(replica.getState() == Replica.State.ACTIVE && liveNodes.contains(replica.getNodeName()))
-          shuffler.add(replica);
-        }
-      }
-
-      if(workers > shuffler.size()) {
-        throw new IOException("Number of workers exceeds nodes in the worker collection");
-      }
-
-      Collections.shuffle(shuffler, new Random());
+      List<String> shardUrls = getShards(this.zkHost, this.collection, this.streamContext);
 
       for(int w=0; w<workers; w++) {
         ModifiableSolrParams paramsLoc = new ModifiableSolrParams();
@@ -293,9 +273,8 @@ public class ParallelStream extends CloudSolrStream implements Expressible {
 
         paramsLoc.set("expr", pushStream.toString());
         paramsLoc.set("qt","/stream");
-        Replica rep = shuffler.get(w);
-        ZkCoreNodeProps zkProps = new ZkCoreNodeProps(rep);
-        String url = zkProps.getCoreUrl();
+
+        String url = shardUrls.get(w);
         SolrStream solrStream = new SolrStream(url, paramsLoc);
         solrStreams.add(solrStream);
       }

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/b03a7b1c/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/StreamContext.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/StreamContext.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/StreamContext.java
index 6cbf090..d1460ea 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/StreamContext.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/StreamContext.java
@@ -50,6 +50,10 @@ public class StreamContext implements Serializable{
     this.entries.put(key, value);
   }
 
+  public boolean containsKey(Object key) {
+    return entries.containsKey(key);
+  }
+
   public Map getEntries() {
     return this.entries;
   }

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/b03a7b1c/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/JDBCStreamTest.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/JDBCStreamTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/JDBCStreamTest.java
index e55c837..9fff33a 100644
--- a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/JDBCStreamTest.java
+++ b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/JDBCStreamTest.java
@@ -27,6 +27,7 @@ import java.util.List;
 import java.util.Locale;
 
 import org.apache.lucene.util.LuceneTestCase;
+import org.apache.solr.client.solrj.io.SolrClientCache;
 import org.apache.solr.client.solrj.io.Tuple;
 import org.apache.solr.client.solrj.io.comp.ComparatorOrder;
 import org.apache.solr.client.solrj.io.comp.FieldComparator;
@@ -205,6 +206,10 @@ public class JDBCStreamTest extends SolrCloudTestCase {
       statement.executeUpdate("insert into COUNTRIES (CODE,COUNTRY_NAME) values ('NO', 'Norway')");
       statement.executeUpdate("insert into COUNTRIES (CODE,COUNTRY_NAME) values ('AL', 'Algeria')");
     }
+
+    StreamContext streamContext = new StreamContext();
+    SolrClientCache solrClientCache = new SolrClientCache();
+    streamContext.setSolrClientCache(solrClientCache);
     
     // Load Solr
     new UpdateRequest()
@@ -217,18 +222,25 @@ public class JDBCStreamTest extends SolrCloudTestCase {
       .withFunctionName("search", CloudSolrStream.class);
     
     List<Tuple> tuples;
-    
-    // Simple 1
-    TupleStream jdbcStream = new JDBCStream("jdbc:hsqldb:mem:.", "select CODE,COUNTRY_NAME from COUNTRIES order by CODE", new FieldComparator("CODE", ComparatorOrder.ASCENDING));
-    TupleStream selectStream = new SelectStream(jdbcStream, new HashMap<String, String>(){{ put("CODE", "code_s"); put("COUNTRY_NAME", "name_s"); }});
-    TupleStream searchStream = factory.constructStream("search(" + COLLECTIONORALIAS + ", fl=\"code_s,name_s\",q=\"*:*\",sort=\"code_s asc\")");
-    TupleStream mergeStream = new MergeStream(new FieldComparator("code_s", ComparatorOrder.ASCENDING), new TupleStream[]{selectStream,searchStream});
-    
-    tuples = getTuples(mergeStream);
-    
-    assertEquals(7, tuples.size());
-    assertOrderOf(tuples, "code_s", "AL","CA","GB","NL","NO","NP","US");
-    assertOrderOf(tuples, "name_s", "Algeria", "Canada", "Great Britian", "Netherlands", "Norway", "Nepal", "United States");
+
+    try {
+      // Simple 1
+      TupleStream jdbcStream = new JDBCStream("jdbc:hsqldb:mem:.", "select CODE,COUNTRY_NAME from COUNTRIES order by CODE", new FieldComparator("CODE", ComparatorOrder.ASCENDING));
+      TupleStream selectStream = new SelectStream(jdbcStream, new HashMap<String, String>() {{
+        put("CODE", "code_s");
+        put("COUNTRY_NAME", "name_s");
+      }});
+      TupleStream searchStream = factory.constructStream("search(" + COLLECTIONORALIAS + ", fl=\"code_s,name_s\",q=\"*:*\",sort=\"code_s asc\")");
+      TupleStream mergeStream = new MergeStream(new FieldComparator("code_s", ComparatorOrder.ASCENDING), new TupleStream[]{selectStream, searchStream});
+      mergeStream.setStreamContext(streamContext);
+      tuples = getTuples(mergeStream);
+
+      assertEquals(7, tuples.size());
+      assertOrderOf(tuples, "code_s", "AL", "CA", "GB", "NL", "NO", "NP", "US");
+      assertOrderOf(tuples, "name_s", "Algeria", "Canada", "Great Britian", "Netherlands", "Norway", "Nepal", "United States");
+    } finally {
+      solrClientCache.close();
+    }
   }
 
   @Test
@@ -277,32 +289,41 @@ public class JDBCStreamTest extends SolrCloudTestCase {
     String expression;
     TupleStream stream;
     List<Tuple> tuples;
-    
-    // Basic test
-    expression =   
-              "innerJoin("
-            + "  select("
-            + "    search(" + COLLECTIONORALIAS + ", fl=\"personId_i,rating_f\", q=\"rating_f:*\", sort=\"personId_i asc\"),"
-            + "    personId_i as personId,"
-            + "    rating_f as rating"
-            + "  ),"
-            + "  select("
-            + "    jdbc(connection=\"jdbc:hsqldb:mem:.\", sql=\"select PEOPLE.ID, PEOPLE.NAME, COUNTRIES.COUNTRY_NAME from PEOPLE inner join COUNTRIES on PEOPLE.COUNTRY_CODE = COUNTRIES.CODE order by PEOPLE.ID\", sort=\"ID asc\"),"
-            + "    ID as personId,"
-            + "    NAME as personName,"
-            + "    COUNTRY_NAME as country"
-            + "  ),"
-            + "  on=\"personId\""
-            + ")";
-
-    stream = factory.constructStream(expression);
-    tuples = getTuples(stream);
-    
-    assertEquals(10, tuples.size());
-    assertOrderOf(tuples, "personId", 11,12,13,14,15,16,17,18,19,20);
-    assertOrderOf(tuples, "rating", 3.5d,5d,2.2d,4.3d,3.5d,3d,3d,4d,4.1d,4.8d);
-    assertOrderOf(tuples, "personName", "Emma","Grace","Hailey","Isabella","Lily","Madison","Mia","Natalie","Olivia","Samantha");
-    assertOrderOf(tuples, "country", "Netherlands","United States","Netherlands","Netherlands","Netherlands","United States","United States","Netherlands","Netherlands","United States");
+    StreamContext streamContext = new StreamContext();
+    SolrClientCache solrClientCache = new SolrClientCache();
+    streamContext.setSolrClientCache(solrClientCache);
+
+    try {
+      // Basic test
+      expression =
+          "innerJoin("
+              + "  select("
+              + "    search(" + COLLECTIONORALIAS + ", fl=\"personId_i,rating_f\", q=\"rating_f:*\", sort=\"personId_i asc\"),"
+              + "    personId_i as personId,"
+              + "    rating_f as rating"
+              + "  ),"
+              + "  select("
+              + "    jdbc(connection=\"jdbc:hsqldb:mem:.\", sql=\"select PEOPLE.ID, PEOPLE.NAME, COUNTRIES.COUNTRY_NAME from PEOPLE inner join COUNTRIES on PEOPLE.COUNTRY_CODE = COUNTRIES.CODE order by PEOPLE.ID\", sort=\"ID asc\"),"
+              + "    ID as personId,"
+              + "    NAME as personName,"
+              + "    COUNTRY_NAME as country"
+              + "  ),"
+              + "  on=\"personId\""
+              + ")";
+
+
+      stream = factory.constructStream(expression);
+      stream.setStreamContext(streamContext);
+      tuples = getTuples(stream);
+
+      assertEquals(10, tuples.size());
+      assertOrderOf(tuples, "personId", 11, 12, 13, 14, 15, 16, 17, 18, 19, 20);
+      assertOrderOf(tuples, "rating", 3.5d, 5d, 2.2d, 4.3d, 3.5d, 3d, 3d, 4d, 4.1d, 4.8d);
+      assertOrderOf(tuples, "personName", "Emma", "Grace", "Hailey", "Isabella", "Lily", "Madison", "Mia", "Natalie", "Olivia", "Samantha");
+      assertOrderOf(tuples, "country", "Netherlands", "United States", "Netherlands", "Netherlands", "Netherlands", "United States", "United States", "Netherlands", "Netherlands", "United States");
+    } finally {
+      solrClientCache.close();
+    }
   }
 
   @Test
@@ -351,58 +372,67 @@ public class JDBCStreamTest extends SolrCloudTestCase {
     String expression;
     TupleStream stream;
     List<Tuple> tuples;
-    
-    // Basic test for no alias
-    expression =
-              "innerJoin("
-            + "  select("
-            + "    search(" + COLLECTIONORALIAS + ", fl=\"personId_i,rating_f\", q=\"rating_f:*\", sort=\"personId_i asc\"),"
-            + "    personId_i as personId,"
-            + "    rating_f as rating"
-            + "  ),"
-            + "  select("
-            + "    jdbc(connection=\"jdbc:hsqldb:mem:.\", sql=\"select PEOPLE.ID, PEOPLE.NAME, COUNTRIES.COUNTRY_NAME from PEOPLE inner join COUNTRIES on PEOPLE.COUNTRY_CODE = COUNTRIES.CODE order by PEOPLE.ID\", sort=\"ID asc\"),"
-            + "    ID as personId,"
-            + "    NAME as personName,"
-            + "    COUNTRY_NAME as country"
-            + "  ),"
-            + "  on=\"personId\""
-            + ")";
-
-    stream = factory.constructStream(expression);
-    tuples = getTuples(stream);
-    
-    assertEquals(10, tuples.size());
-    assertOrderOf(tuples, "personId", 11,12,13,14,15,16,17,18,19,20);
-    assertOrderOf(tuples, "rating", 3.5d,5d,2.2d,4.3d,3.5d,3d,3d,4d,4.1d,4.8d);
-    assertOrderOf(tuples, "personName", "Emma","Grace","Hailey","Isabella","Lily","Madison","Mia","Natalie","Olivia","Samantha");
-    assertOrderOf(tuples, "country", "Netherlands","United States","Netherlands","Netherlands","Netherlands","United States","United States","Netherlands","Netherlands","United States");
-    
-    // Basic test for alias
-    expression =   
-              "innerJoin("
-            + "  select("
-            + "    search(" + COLLECTIONORALIAS + ", fl=\"personId_i,rating_f\", q=\"rating_f:*\", sort=\"personId_i asc\"),"
-            + "    personId_i as personId,"
-            + "    rating_f as rating"
-            + "  ),"
-            + "  select("
-            + "    jdbc(connection=\"jdbc:hsqldb:mem:.\", sql=\"select PEOPLE.ID as PERSONID, PEOPLE.NAME, COUNTRIES.COUNTRY_NAME from PEOPLE inner join COUNTRIES on PEOPLE.COUNTRY_CODE = COUNTRIES.CODE order by PEOPLE.ID\", sort=\"PERSONID asc\"),"
-            + "    PERSONID as personId,"
-            + "    NAME as personName,"
-            + "    COUNTRY_NAME as country"
-            + "  ),"
-            + "  on=\"personId\""
-            + ")";
-
-    stream = factory.constructStream(expression);
-    tuples = getTuples(stream);
-    
-    assertEquals(10, tuples.size());
-    assertOrderOf(tuples, "personId", 11,12,13,14,15,16,17,18,19,20);
-    assertOrderOf(tuples, "rating", 3.5d,5d,2.2d,4.3d,3.5d,3d,3d,4d,4.1d,4.8d);
-    assertOrderOf(tuples, "personName", "Emma","Grace","Hailey","Isabella","Lily","Madison","Mia","Natalie","Olivia","Samantha");
-    assertOrderOf(tuples, "country", "Netherlands","United States","Netherlands","Netherlands","Netherlands","United States","United States","Netherlands","Netherlands","United States");
+    StreamContext streamContext = new StreamContext();
+    SolrClientCache solrClientCache = new SolrClientCache();
+    streamContext.setSolrClientCache(solrClientCache);
+
+    try {
+      // Basic test for no alias
+      expression =
+          "innerJoin("
+              + "  select("
+              + "    search(" + COLLECTIONORALIAS + ", fl=\"personId_i,rating_f\", q=\"rating_f:*\", sort=\"personId_i asc\"),"
+              + "    personId_i as personId,"
+              + "    rating_f as rating"
+              + "  ),"
+              + "  select("
+              + "    jdbc(connection=\"jdbc:hsqldb:mem:.\", sql=\"select PEOPLE.ID, PEOPLE.NAME, COUNTRIES.COUNTRY_NAME from PEOPLE inner join COUNTRIES on PEOPLE.COUNTRY_CODE = COUNTRIES.CODE order by PEOPLE.ID\", sort=\"ID asc\"),"
+              + "    ID as personId,"
+              + "    NAME as personName,"
+              + "    COUNTRY_NAME as country"
+              + "  ),"
+              + "  on=\"personId\""
+              + ")";
+
+      stream = factory.constructStream(expression);
+      stream.setStreamContext(streamContext);
+      tuples = getTuples(stream);
+
+      assertEquals(10, tuples.size());
+      assertOrderOf(tuples, "personId", 11, 12, 13, 14, 15, 16, 17, 18, 19, 20);
+      assertOrderOf(tuples, "rating", 3.5d, 5d, 2.2d, 4.3d, 3.5d, 3d, 3d, 4d, 4.1d, 4.8d);
+      assertOrderOf(tuples, "personName", "Emma", "Grace", "Hailey", "Isabella", "Lily", "Madison", "Mia", "Natalie", "Olivia", "Samantha");
+      assertOrderOf(tuples, "country", "Netherlands", "United States", "Netherlands", "Netherlands", "Netherlands", "United States", "United States", "Netherlands", "Netherlands", "United States");
+
+      // Basic test for alias
+      expression =
+          "innerJoin("
+              + "  select("
+              + "    search(" + COLLECTIONORALIAS + ", fl=\"personId_i,rating_f\", q=\"rating_f:*\", sort=\"personId_i asc\"),"
+              + "    personId_i as personId,"
+              + "    rating_f as rating"
+              + "  ),"
+              + "  select("
+              + "    jdbc(connection=\"jdbc:hsqldb:mem:.\", sql=\"select PEOPLE.ID as PERSONID, PEOPLE.NAME, COUNTRIES.COUNTRY_NAME from PEOPLE inner join COUNTRIES on PEOPLE.COUNTRY_CODE = COUNTRIES.CODE order by PEOPLE.ID\", sort=\"PERSONID asc\"),"
+              + "    PERSONID as personId,"
+              + "    NAME as personName,"
+              + "    COUNTRY_NAME as country"
+              + "  ),"
+              + "  on=\"personId\""
+              + ")";
+
+      stream = factory.constructStream(expression);
+      stream.setStreamContext(streamContext);
+      tuples = getTuples(stream);
+
+      assertEquals(10, tuples.size());
+      assertOrderOf(tuples, "personId", 11, 12, 13, 14, 15, 16, 17, 18, 19, 20);
+      assertOrderOf(tuples, "rating", 3.5d, 5d, 2.2d, 4.3d, 3.5d, 3d, 3d, 4d, 4.1d, 4.8d);
+      assertOrderOf(tuples, "personName", "Emma", "Grace", "Hailey", "Isabella", "Lily", "Madison", "Mia", "Natalie", "Olivia", "Samantha");
+      assertOrderOf(tuples, "country", "Netherlands", "United States", "Netherlands", "Netherlands", "Netherlands", "United States", "United States", "Netherlands", "Netherlands", "United States");
+    } finally {
+      solrClientCache.close();
+    }
   }
 
   @Test
@@ -439,7 +469,7 @@ public class JDBCStreamTest extends SolrCloudTestCase {
       statement.executeUpdate("insert into PEOPLE (ID, NAME, COUNTRY_CODE) values (19,'Olivia','NL')");
       statement.executeUpdate("insert into PEOPLE (ID, NAME, COUNTRY_CODE) values (20,'Samantha','US')");
     }
-    
+
     // Load solr data
     new UpdateRequest()
         .add(id, "1", "rating_f", "3.5", "personId_i", "11")
@@ -457,50 +487,58 @@ public class JDBCStreamTest extends SolrCloudTestCase {
     String expression;
     TupleStream stream;
     List<Tuple> tuples;
-    
-    // Basic test
-    expression =   
-              "rollup("
-            + "  hashJoin("
-            + "    hashed=select("
-            + "      search(" + COLLECTIONORALIAS + ", fl=\"personId_i,rating_f\", q=\"rating_f:*\", sort=\"personId_i asc\"),"
-            + "      personId_i as personId,"
-            + "      rating_f as rating"
-            + "    ),"
-            + "    select("
-            + "      jdbc(connection=\"jdbc:hsqldb:mem:.\", sql=\"select PEOPLE.ID, PEOPLE.NAME, COUNTRIES.COUNTRY_NAME from PEOPLE inner join COUNTRIES on PEOPLE.COUNTRY_CODE = COUNTRIES.CODE order by COUNTRIES.COUNTRY_NAME\", sort=\"COUNTRIES.COUNTRY_NAME asc\"),"
-            + "      ID as personId,"
-            + "      NAME as personName,"
-            + "      COUNTRY_NAME as country"
-            + "    ),"
-            + "    on=\"personId\""
-            + "  ),"
-            + "  over=\"country\","
-            + "  max(rating),"
-            + "  min(rating),"
-            + "  avg(rating),"
-            + "  count(*)"
-            + ")";
-
-    stream = factory.constructStream(expression);
-    tuples = getTuples(stream);
-    
-    assertEquals(2, tuples.size());
-    
-    Tuple tuple = tuples.get(0);
-    assertEquals("Netherlands",tuple.getString("country"));
-    assertTrue(4.3D == tuple.getDouble("max(rating)"));
-    assertTrue(2.2D == tuple.getDouble("min(rating)"));
-    assertTrue(3.6D == tuple.getDouble("avg(rating)"));
-    assertTrue(6D == tuple.getDouble("count(*)"));
-    
-    tuple = tuples.get(1);
-    assertEquals("United States",tuple.getString("country"));
-    assertTrue(5D == tuple.getDouble("max(rating)"));
-    assertTrue(3D == tuple.getDouble("min(rating)"));
-    assertTrue(3.95D == tuple.getDouble("avg(rating)"));
-    assertTrue(4D == tuple.getDouble("count(*)"));
-    
+
+    StreamContext streamContext = new StreamContext();
+    SolrClientCache solrClientCache = new SolrClientCache();
+    streamContext.setSolrClientCache(solrClientCache);
+
+    try {
+      // Basic test
+      expression =
+          "rollup("
+              + "  hashJoin("
+              + "    hashed=select("
+              + "      search(" + COLLECTIONORALIAS + ", fl=\"personId_i,rating_f\", q=\"rating_f:*\", sort=\"personId_i asc\"),"
+              + "      personId_i as personId,"
+              + "      rating_f as rating"
+              + "    ),"
+              + "    select("
+              + "      jdbc(connection=\"jdbc:hsqldb:mem:.\", sql=\"select PEOPLE.ID, PEOPLE.NAME, COUNTRIES.COUNTRY_NAME from PEOPLE inner join COUNTRIES on PEOPLE.COUNTRY_CODE = COUNTRIES.CODE order by COUNTRIES.COUNTRY_NAME\", sort=\"COUNTRIES.COUNTRY_NAME asc\"),"
+              + "      ID as personId,"
+              + "      NAME as personName,"
+              + "      COUNTRY_NAME as country"
+              + "    ),"
+              + "    on=\"personId\""
+              + "  ),"
+              + "  over=\"country\","
+              + "  max(rating),"
+              + "  min(rating),"
+              + "  avg(rating),"
+              + "  count(*)"
+              + ")";
+
+      stream = factory.constructStream(expression);
+      stream.setStreamContext(streamContext);
+      tuples = getTuples(stream);
+
+      assertEquals(2, tuples.size());
+
+      Tuple tuple = tuples.get(0);
+      assertEquals("Netherlands", tuple.getString("country"));
+      assertTrue(4.3D == tuple.getDouble("max(rating)"));
+      assertTrue(2.2D == tuple.getDouble("min(rating)"));
+      assertTrue(3.6D == tuple.getDouble("avg(rating)"));
+      assertTrue(6D == tuple.getDouble("count(*)"));
+
+      tuple = tuples.get(1);
+      assertEquals("United States", tuple.getString("country"));
+      assertTrue(5D == tuple.getDouble("max(rating)"));
+      assertTrue(3D == tuple.getDouble("min(rating)"));
+      assertTrue(3.95D == tuple.getDouble("avg(rating)"));
+      assertTrue(4D == tuple.getDouble("count(*)"));
+    } finally {
+      solrClientCache.close();
+    }
   }
   
   @Test(expected=IOException.class)

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/b03a7b1c/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/SelectWithEvaluatorsTest.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/SelectWithEvaluatorsTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/SelectWithEvaluatorsTest.java
index b91df8d..75bf92d 100644
--- a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/SelectWithEvaluatorsTest.java
+++ b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/SelectWithEvaluatorsTest.java
@@ -24,6 +24,7 @@ import java.util.Map;
 
 import org.apache.lucene.util.LuceneTestCase;
 import org.apache.lucene.util.LuceneTestCase.Slow;
+import org.apache.solr.client.solrj.io.SolrClientCache;
 import org.apache.solr.client.solrj.io.Tuple;
 import org.apache.solr.client.solrj.io.eval.AddEvaluator;
 import org.apache.solr.client.solrj.io.eval.GreaterThanEvaluator;
@@ -92,6 +93,9 @@ public class SelectWithEvaluatorsTest extends SolrCloudTestCase {
     String clause;
     TupleStream stream;
     List<Tuple> tuples;
+    StreamContext streamContext = new StreamContext();
+    SolrClientCache solrClientCache = new SolrClientCache();
+    streamContext.setSolrClientCache(solrClientCache);
     
     StreamFactory factory = new StreamFactory()
       .withCollectionZkHost("collection1", cluster.getZkServer().getZkAddress())
@@ -101,21 +105,24 @@ public class SelectWithEvaluatorsTest extends SolrCloudTestCase {
       .withFunctionName("if", IfThenElseEvaluator.class)
       .withFunctionName("gt", GreaterThanEvaluator.class)
       ;
-    
-    // Basic test
-    clause = "select("
-            +   "id,"
-            +   "add(b_i,c_d) as result,"
-            +   "search(collection1, q=*:*, fl=\"id,a_s,b_i,c_d,d_b\", sort=\"id asc\")"
-            + ")";
-    stream = factory.constructStream(clause);
-    tuples = getTuples(stream);
-    assertFields(tuples, "id", "result");
-    assertNotFields(tuples, "a_s", "b_i", "c_d", "d_b");
-    assertEquals(1, tuples.size());
-    assertDouble(tuples.get(0), "result", 4.3);
-    assertEquals(4.3, tuples.get(0).get("result"));
-
+    try {
+      // Basic test
+      clause = "select("
+          + "id,"
+          + "add(b_i,c_d) as result,"
+          + "search(collection1, q=*:*, fl=\"id,a_s,b_i,c_d,d_b\", sort=\"id asc\")"
+          + ")";
+      stream = factory.constructStream(clause);
+      stream.setStreamContext(streamContext);
+      tuples = getTuples(stream);
+      assertFields(tuples, "id", "result");
+      assertNotFields(tuples, "a_s", "b_i", "c_d", "d_b");
+      assertEquals(1, tuples.size());
+      assertDouble(tuples.get(0), "result", 4.3);
+      assertEquals(4.3, tuples.get(0).get("result"));
+    } finally {
+      solrClientCache.close();
+    }
   }
   
   protected List<Tuple> getTuples(TupleStream tupleStream) throws IOException {